diff --git a/Sources/AWSLambdaRuntimeCore/ControlPlaneRequest.swift b/Sources/AWSLambdaRuntimeCore/ControlPlaneRequest.swift index 14c5f2a7..3ef5a6fe 100644 --- a/Sources/AWSLambdaRuntimeCore/ControlPlaneRequest.swift +++ b/Sources/AWSLambdaRuntimeCore/ControlPlaneRequest.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftAWSLambdaRuntime open source project // -// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Copyright (c) 2021-2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -17,8 +17,8 @@ import NIOHTTP1 enum ControlPlaneRequest: Hashable { case next - case invocationResponse(String, ByteBuffer?) - case invocationError(String, ErrorResponse) + case invocationResponse(LambdaRequestID, ByteBuffer?) + case invocationError(LambdaRequestID, ErrorResponse) case initializationError(ErrorResponse) } @@ -29,12 +29,26 @@ enum ControlPlaneResponse: Hashable { } struct Invocation: Hashable { - let requestID: String - let deadlineInMillisSinceEpoch: Int64 - let invokedFunctionARN: String - let traceID: String - let clientContext: String? - let cognitoIdentity: String? + var requestID: String + var deadlineInMillisSinceEpoch: Int64 + var invokedFunctionARN: String + var traceID: String + var clientContext: String? + var cognitoIdentity: String? + + init(requestID: String, + deadlineInMillisSinceEpoch: Int64, + invokedFunctionARN: String, + traceID: String, + clientContext: String?, + cognitoIdentity: String?) { + self.requestID = requestID + self.deadlineInMillisSinceEpoch = deadlineInMillisSinceEpoch + self.invokedFunctionARN = invokedFunctionARN + self.traceID = traceID + self.clientContext = clientContext + self.cognitoIdentity = cognitoIdentity + } init(headers: HTTPHeaders) throws { guard let requestID = headers.first(name: AmazonHeaders.requestID), !requestID.isEmpty else { @@ -51,12 +65,16 @@ struct Invocation: Hashable { throw Lambda.RuntimeError.invocationMissingHeader(AmazonHeaders.invokedFunctionARN) } - self.requestID = requestID - self.deadlineInMillisSinceEpoch = unixTimeInMilliseconds - self.invokedFunctionARN = invokedFunctionARN - self.traceID = headers.first(name: AmazonHeaders.traceID) ?? "Root=\(AmazonHeaders.generateXRayTraceID());Sampled=0" - self.clientContext = headers["Lambda-Runtime-Client-Context"].first - self.cognitoIdentity = headers["Lambda-Runtime-Cognito-Identity"].first + let traceID = headers.first(name: AmazonHeaders.traceID) ?? "Root=\(AmazonHeaders.generateXRayTraceID());Sampled=0" + + self.init( + requestID: requestID, + deadlineInMillisSinceEpoch: unixTimeInMilliseconds, + invokedFunctionARN: invokedFunctionARN, + traceID: traceID, + clientContext: headers["Lambda-Runtime-Client-Context"].first, + cognitoIdentity: headers["Lambda-Runtime-Cognito-Identity"].first + ) } } diff --git a/Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift b/Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift index a91e1e44..a8ad3b64 100644 --- a/Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift +++ b/Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift @@ -100,15 +100,15 @@ extension String { } extension ByteBuffer { - fileprivate mutating func writeInvocationResultRequestLine(_ requestID: String) { + fileprivate mutating func writeInvocationResultRequestLine(_ requestID: LambdaRequestID) { self.writeString("POST /2018-06-01/runtime/invocation/") - self.writeString(requestID) + self.writeRequestID(requestID) self.writeString("/response HTTP/1.1\r\n") } - fileprivate mutating func writeInvocationErrorRequestLine(_ requestID: String) { + fileprivate mutating func writeInvocationErrorRequestLine(_ requestID: LambdaRequestID) { self.writeString("POST /2018-06-01/runtime/invocation/") - self.writeString(requestID) + self.writeRequestID(requestID) self.writeString("/error HTTP/1.1\r\n") } diff --git a/Sources/AWSLambdaRuntimeCore/ControlPlaneResponseDecoder.swift b/Sources/AWSLambdaRuntimeCore/ControlPlaneResponseDecoder.swift new file mode 100644 index 00000000..4f3a44af --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/ControlPlaneResponseDecoder.swift @@ -0,0 +1,525 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +struct ControlPlaneResponseDecoder: NIOSingleStepByteToMessageDecoder { + typealias InboundOut = ControlPlaneResponse + + private enum State { + case waitingForNewResponse + case parsingHead(PartialHead) + case waitingForBody(PartialHead) + case receivingBody(PartialHead, ByteBuffer) + } + + private var state: State + + init() { + self.state = .waitingForNewResponse + } + + mutating func decode(buffer: inout ByteBuffer) throws -> ControlPlaneResponse? { + switch self.state { + case .waitingForNewResponse: + guard case .decoded(let head) = try self.decodeResponseHead(from: &buffer) else { + return nil + } + + guard case .decoded(let body) = try self.decodeBody(from: &buffer) else { + return nil + } + + return try self.decodeResponse(head: head, body: body) + + case .parsingHead: + guard case .decoded(let head) = try self.decodeHeaderLines(from: &buffer) else { + return nil + } + + guard case .decoded(let body) = try self.decodeBody(from: &buffer) else { + return nil + } + + return try self.decodeResponse(head: head, body: body) + + case .waitingForBody(let head), .receivingBody(let head, _): + guard case .decoded(let body) = try self.decodeBody(from: &buffer) else { + return nil + } + + return try self.decodeResponse(head: head, body: body) + } + } + + mutating func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> ControlPlaneResponse? { + try self.decode(buffer: &buffer) + } + + // MARK: - Private Methods - + + private enum DecodeResult { + case needMoreData + case decoded(T) + } + + private mutating func decodeResponseHead(from buffer: inout ByteBuffer) throws -> DecodeResult { + guard case .decoded = try self.decodeResponseStatusLine(from: &buffer) else { + return .needMoreData + } + + return try self.decodeHeaderLines(from: &buffer) + } + + private mutating func decodeResponseStatusLine(from buffer: inout ByteBuffer) throws -> DecodeResult { + guard case .waitingForNewResponse = self.state else { + preconditionFailure("Invalid state: \(self.state)") + } + + guard case .decoded(var lineBuffer) = try self.decodeCRLFTerminatedLine(from: &buffer) else { + return .needMoreData + } + + let statusCode = try self.decodeStatusLine(from: &lineBuffer) + self.state = .parsingHead(.init(statusCode: statusCode)) + return .decoded(statusCode) + } + + private mutating func decodeHeaderLines(from buffer: inout ByteBuffer) throws -> DecodeResult { + guard case .parsingHead(var head) = self.state else { + preconditionFailure("Invalid state: \(self.state)") + } + + while true { + guard case .decoded(var nextLine) = try self.decodeCRLFTerminatedLine(from: &buffer) else { + self.state = .parsingHead(head) + return .needMoreData + } + + switch try self.decodeHeaderLine(from: &nextLine) { + case .headerEnd: + self.state = .waitingForBody(head) + return .decoded(head) + + case .contentLength(let length): + head.contentLength = length // TODO: This can crash + + case .contentType: + break // switch + + case .requestID(let requestID): + head.requestID = requestID + + case .traceID(let traceID): + head.traceID = traceID + + case .functionARN(let arn): + head.invokedFunctionARN = arn + + case .cognitoIdentity(let cognitoIdentity): + head.cognitoIdentity = cognitoIdentity + + case .deadlineMS(let deadline): + head.deadlineInMillisSinceEpoch = deadline + + case .ignore: + break // switch + } + } + } + + enum BodyEncoding { + case chunked + case plain(length: Int) + case none + } + + private mutating func decodeBody(from buffer: inout ByteBuffer) throws -> DecodeResult { + switch self.state { + case .waitingForBody(let partialHead): + switch partialHead.contentLength { + case .none: + return .decoded(nil) + case .some(let length): + if let slice = buffer.readSlice(length: length) { + self.state = .waitingForNewResponse + return .decoded(slice) + } + return .needMoreData + } + + case .waitingForNewResponse, .parsingHead, .receivingBody: + preconditionFailure("Invalid state: \(self.state)") + } + } + + private mutating func decodeResponse(head: PartialHead, body: ByteBuffer?) throws -> ControlPlaneResponse { + switch head.statusCode { + case 200: + guard let body = body else { + preconditionFailure("TODO: implement") + } + return .next(try Invocation(head: head), body) + case 202: + return .accepted + case 400 ..< 600: + preconditionFailure("TODO: implement") + + default: + throw LambdaRuntimeError.unexpectedStatusCode + } + } + + mutating func decodeStatusLine(from buffer: inout ByteBuffer) throws -> Int { + guard buffer.readableBytes >= 11 else { + throw LambdaRuntimeError.responseHeadInvalidStatusLine + } + + guard buffer.readString("HTTP/1.1 ") else { + throw LambdaRuntimeError.responseHeadInvalidStatusLine + } + + let statusAsString = buffer.readString(length: 3)! + guard let status = Int(statusAsString) else { + throw LambdaRuntimeError.responseHeadInvalidStatusLine + } + + return status + } + + private mutating func decodeCRLFTerminatedLine(from buffer: inout ByteBuffer) throws -> DecodeResult { + guard let crIndex = buffer.readableBytesView.firstIndex(of: UInt8(ascii: "\r")) else { + if buffer.readableBytes > 256 { + throw LambdaRuntimeError.responseHeadMoreThan256BytesBeforeCRLF + } + return .needMoreData + } + let lfIndex = buffer.readableBytesView.index(after: crIndex) + guard lfIndex < buffer.readableBytesView.endIndex else { + // the buffer is split exactly after the \r and \n. Let's wait for more data + return .needMoreData + } + + guard buffer.readableBytesView[lfIndex] == UInt8(ascii: "\n") else { + throw LambdaRuntimeError.responseHeadInvalidHeader + } + + let slice = buffer.readSlice(length: crIndex - buffer.readerIndex)! + buffer.moveReaderIndex(forwardBy: 2) // move over \r\n + return .decoded(slice) + } + + private enum HeaderLineContent: Equatable { + case traceID(String) + case contentType + case contentLength(Int) + case cognitoIdentity(String) + case deadlineMS(Int) + case functionARN(String) + case requestID(LambdaRequestID) + + case ignore + case headerEnd + } + + private mutating func decodeHeaderLine(from buffer: inout ByteBuffer) throws -> HeaderLineContent { + guard let colonIndex = buffer.readableBytesView.firstIndex(of: UInt8(ascii: ":")) else { + if buffer.readableBytes == 0 { + return .headerEnd + } + throw LambdaRuntimeError.responseHeadHeaderMissingColon + } + + // based on colonIndex we can already make some good guesses... + // 4: Date + // 12: Content-Type + // 14: Content-Length + // 17: Transfer-Encoding + // 23: Lambda-Runtime-Trace-Id + // 26: Lambda-Runtime-Deadline-Ms + // 29: Lambda-Runtime-Aws-Request-Id + // Lambda-Runtime-Client-Context + // 31: Lambda-Runtime-Cognito-Identity + // 35: Lambda-Runtime-Invoked-Function-Arn + + switch colonIndex { + case 4: + if buffer.readHeaderName("date") { + return .ignore + } + + case 12: + if buffer.readHeaderName("content-type") { + return .ignore + } + + case 14: + if buffer.readHeaderName("content-length") { + buffer.moveReaderIndex(forwardBy: 1) // move forward for colon + try self.decodeOptionalWhiteSpaceBeforeFieldValue(from: &buffer) + guard let length = buffer.readIntegerFromHeader() else { + throw LambdaRuntimeError.responseHeadInvalidContentLengthValue + } + return .contentLength(length) + } + + case 17: + if buffer.readHeaderName("transfer-encoding") { + buffer.moveReaderIndex(forwardBy: 1) // move forward for colon + try self.decodeOptionalWhiteSpaceBeforeFieldValue(from: &buffer) + guard let length = buffer.readIntegerFromHeader() else { + throw LambdaRuntimeError.responseHeadInvalidDeadlineValue + } + return .contentLength(length) + } + + case 23: + if buffer.readHeaderName("lambda-runtime-trace-id") { + buffer.moveReaderIndex(forwardBy: 1) + guard let string = try self.decodeHeaderValue(from: &buffer) else { + throw LambdaRuntimeError.responseHeadInvalidTraceIDValue + } + return .traceID(string) + } + + case 26: + if buffer.readHeaderName("lambda-runtime-deadline-ms") { + buffer.moveReaderIndex(forwardBy: 1) // move forward for colon + try self.decodeOptionalWhiteSpaceBeforeFieldValue(from: &buffer) + guard let deadline = buffer.readIntegerFromHeader() else { + throw LambdaRuntimeError.responseHeadInvalidContentLengthValue + } + return .deadlineMS(deadline) + } + + case 29: + if buffer.readHeaderName("lambda-runtime-aws-request-id") { + buffer.moveReaderIndex(forwardBy: 1) // move forward for colon + try self.decodeOptionalWhiteSpaceBeforeFieldValue(from: &buffer) + guard let requestID = buffer.readRequestID() else { + throw LambdaRuntimeError.responseHeadInvalidRequestIDValue + } + return .requestID(requestID) + } + if buffer.readHeaderName("lambda-runtime-client-context") { + return .ignore + } + + case 31: + if buffer.readHeaderName("lambda-runtime-cognito-identity") { + return .ignore + } + + case 35: + if buffer.readHeaderName("lambda-runtime-invoked-function-arn") { + buffer.moveReaderIndex(forwardBy: 1) + guard let string = try self.decodeHeaderValue(from: &buffer) else { + throw LambdaRuntimeError.responseHeadInvalidTraceIDValue + } + return .functionARN(string) + } + + default: + // Ensure we received a valid http header: + break // fallthrough + } + + // We received a header we didn't expect, let's ensure it is valid. + let satisfy = buffer.readableBytesView[0 ..< colonIndex].allSatisfy { char -> Bool in + switch char { + case UInt8(ascii: "a") ... UInt8(ascii: "z"), + UInt8(ascii: "A") ... UInt8(ascii: "Z"), + UInt8(ascii: "0") ... UInt8(ascii: "9"), + UInt8(ascii: "!"), + UInt8(ascii: "#"), + UInt8(ascii: "$"), + UInt8(ascii: "%"), + UInt8(ascii: "&"), + UInt8(ascii: "'"), + UInt8(ascii: "*"), + UInt8(ascii: "+"), + UInt8(ascii: "-"), + UInt8(ascii: "."), + UInt8(ascii: "^"), + UInt8(ascii: "_"), + UInt8(ascii: "`"), + UInt8(ascii: "|"), + UInt8(ascii: "~"): + return true + default: + return false + } + } + + guard satisfy else { + throw LambdaRuntimeError.responseHeadHeaderInvalidCharacter + } + + return .ignore + } + + @discardableResult + mutating func decodeOptionalWhiteSpaceBeforeFieldValue(from buffer: inout ByteBuffer) throws -> Int { + let startIndex = buffer.readerIndex + guard let index = buffer.readableBytesView.firstIndex(where: { $0 != UInt8(ascii: " ") && $0 != UInt8(ascii: "\t") }) else { + throw LambdaRuntimeError.responseHeadHeaderMissingFieldValue + } + buffer.moveReaderIndex(to: index) + return index - startIndex + } + + private func decodeHeaderValue(from buffer: inout ByteBuffer) throws -> String? { + func isNotOptionalWhiteSpace(_ val: UInt8) -> Bool { + val != UInt8(ascii: " ") && val != UInt8(ascii: "\t") + } + + guard let firstCharacterIndex = buffer.readableBytesView.firstIndex(where: isNotOptionalWhiteSpace), + let lastCharacterIndex = buffer.readableBytesView.lastIndex(where: isNotOptionalWhiteSpace) + else { + throw LambdaRuntimeError.responseHeadHeaderMissingFieldValue + } + + let string = buffer.getString(at: firstCharacterIndex, length: lastCharacterIndex + 1 - firstCharacterIndex) + buffer.moveReaderIndex(to: buffer.writerIndex) + return string + } +} + +extension ControlPlaneResponseDecoder { + fileprivate struct PartialHead { + var statusCode: Int + var contentLength: Int? + + var requestID: LambdaRequestID? + var deadlineInMillisSinceEpoch: Int? + var invokedFunctionARN: String? + var traceID: String? + var clientContext: String? + var cognitoIdentity: String? + + init(statusCode: Int) { + self.statusCode = statusCode + self.contentLength = nil + + self.requestID = nil + self.deadlineInMillisSinceEpoch = nil + self.invokedFunctionARN = nil + self.traceID = nil + self.clientContext = nil + self.cognitoIdentity = nil + } + } +} + +extension ByteBuffer { + fileprivate mutating func readString(_ string: String) -> Bool { + let result = self.withUnsafeReadableBytes { inputBuffer in + string.utf8.withContiguousStorageIfAvailable { validateBuffer -> Bool in + assert(inputBuffer.count >= validateBuffer.count) + + for idx in 0 ..< validateBuffer.count { + if inputBuffer[idx] != validateBuffer[idx] { + return false + } + } + return true + } + }! + + if result { + self.moveReaderIndex(forwardBy: string.utf8.count) + return true + } + + return false + } + + fileprivate mutating func readHeaderName(_ name: String) -> Bool { + let result = self.withUnsafeReadableBytes { inputBuffer in + name.utf8.withContiguousStorageIfAvailable { nameBuffer -> Bool in + assert(inputBuffer.count >= nameBuffer.count) + + for idx in 0 ..< nameBuffer.count { + // let's hope this gets vectorised ;) + if inputBuffer[idx] & 0xDF != nameBuffer[idx] & 0xDF { + return false + } + } + return true + } + }! + + if result { + self.moveReaderIndex(forwardBy: name.utf8.count) + return true + } + + return false + } + + mutating func readIntegerFromHeader() -> Int? { + guard let ascii = self.readInteger(as: UInt8.self), UInt8(ascii: "0") <= ascii && ascii <= UInt8(ascii: "9") else { + return nil + } + var value = Int(ascii - UInt8(ascii: "0")) + loop: while let ascii = self.readInteger(as: UInt8.self) { + switch ascii { + case UInt8(ascii: "0") ... UInt8(ascii: "9"): + value = value * 10 + value += Int(ascii - UInt8(ascii: "0")) + + case UInt8(ascii: " "), UInt8(ascii: "\t"): + // verify that all following characters are also whitespace + guard self.readableBytesView.allSatisfy({ $0 == UInt8(ascii: " ") || $0 == UInt8(ascii: "\t") }) else { + return nil + } + return value + + default: + return nil + } + } + + return value + } +} + +extension Invocation { + fileprivate init(head: ControlPlaneResponseDecoder.PartialHead) throws { + guard let requestID = head.requestID else { + throw LambdaRuntimeError.invocationHeadMissingRequestID + } + + guard let deadlineInMillisSinceEpoch = head.deadlineInMillisSinceEpoch else { + throw LambdaRuntimeError.invocationHeadMissingDeadlineInMillisSinceEpoch + } + + guard let invokedFunctionARN = head.invokedFunctionARN else { + throw LambdaRuntimeError.invocationHeadMissingFunctionARN + } + + guard let traceID = head.traceID else { + throw LambdaRuntimeError.invocationHeadMissingTraceID + } + + self = Invocation( + requestID: requestID.lowercased, + deadlineInMillisSinceEpoch: Int64(deadlineInMillisSinceEpoch), + invokedFunctionARN: invokedFunctionARN, + traceID: traceID, + clientContext: head.clientContext, + cognitoIdentity: head.cognitoIdentity + ) + } +} diff --git a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift index 1e09d867..7f12546a 100644 --- a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift @@ -131,7 +131,7 @@ private enum LocalLambda { guard let work = request.body else { return self.writeResponse(context: context, response: .init(status: .badRequest)) } - let requestID = "\(DispatchTime.now().uptimeNanoseconds)" // FIXME: + let requestID = LambdaRequestID().lowercased let promise = context.eventLoop.makePromise(of: Response.self) promise.futureResult.whenComplete { result in switch result { diff --git a/Sources/AWSLambdaRuntimeCore/Lambda.swift b/Sources/AWSLambdaRuntimeCore/Lambda.swift index 1bf4f0dc..43e299e7 100644 --- a/Sources/AWSLambdaRuntimeCore/Lambda.swift +++ b/Sources/AWSLambdaRuntimeCore/Lambda.swift @@ -34,17 +34,13 @@ public enum Lambda { /// Run a Lambda defined by implementing the ``ByteBufferLambdaHandler`` protocol. /// The Runtime will manage the Lambdas application lifecycle automatically. It will invoke the - /// ``ByteBufferLambdaHandler/makeHandler(context:)`` to create a new Handler. + /// ``ByteBufferLambdaHandler/factory(context:)`` to create a new Handler. /// /// - parameters: - /// - configuration: A Lambda runtime configuration object - /// - handlerType: The Handler to create and invoke. + /// - factory: A `ByteBufferLambdaHandler` factory. /// /// - note: This is a blocking operation that will run forever, as its lifecycle is managed by the AWS Lambda Runtime Engine. - internal static func run( - configuration: Configuration = .init(), - handlerType: Handler.Type - ) -> Result { + internal static func run(configuration: Configuration = .init(), handlerType: Handler.Type) -> Result { let _run = { (configuration: Configuration) -> Result in Backtrace.install() var logger = Logger(label: "Lambda") diff --git a/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift b/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift index 3c2697ff..621056e5 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift @@ -14,6 +14,11 @@ import Dispatch import NIOCore +#if canImport(Darwin) +import Darwin +#else +import Glibc +#endif // MARK: - LambdaHandler @@ -201,7 +206,27 @@ extension ByteBufferLambdaHandler { /// The lambda runtime provides a default implementation of the method that manages the launch /// process. public static func main() { + #if false _ = Lambda.run(configuration: .init(), handlerType: Self.self) + #else + + #if DEBUG + if Lambda.env("LOCAL_LAMBDA_SERVER_ENABLED").flatMap(Bool.init) ?? false { + do { + return try Lambda.withLocalServer { + NewLambdaRuntime.run(handlerType: Self.self) + } + } catch { + print(error) + exit(1) + } + } else { + NewLambdaRuntime.run(handlerType: Self.self) + } + #else + NewLambdaRuntime.run(handlerType: Self.self) + #endif + #endif } } diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRequestID.swift b/Sources/AWSLambdaRuntimeCore/LambdaRequestID.swift index 86178ff4..031d8c3f 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRequestID.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRequestID.swift @@ -64,11 +64,6 @@ struct LambdaRequestID { private let _uuid: uuid_t - /// Returns a string representation for the `LambdaRequestID`, such as "E621E1F8-C36C-495A-93FC-0C247A3E6E5F" - var uuidString: String { - self.uppercased - } - /// Returns a lowercase string representation for the `LambdaRequestID`, such as "e621e1f8-c36c-495a-93fc-0c247a3e6e5f" var lowercased: String { var bytes = self.toAsciiBytesOnStack(characters: Self.lowercaseLookup) @@ -144,13 +139,13 @@ extension LambdaRequestID: Hashable { extension LambdaRequestID: CustomStringConvertible { var description: String { - self.uuidString + self.lowercased } } extension LambdaRequestID: CustomDebugStringConvertible { var debugDescription: String { - self.uuidString + self.lowercased } } @@ -170,7 +165,7 @@ extension LambdaRequestID: Decodable { extension LambdaRequestID: Encodable { func encode(to encoder: Encoder) throws { var container = encoder.singleValueContainer() - try container.encode(self.uuidString) + try container.encode(self.lowercased) } } @@ -346,6 +341,7 @@ extension ByteBuffer { } } + @discardableResult mutating func writeRequestID(_ requestID: LambdaRequestID) -> Int { let length = self.setRequestID(requestID, at: self.writerIndex) self.moveWriterIndex(forwardBy: length) diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntime+StateMachine.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntime+StateMachine.swift new file mode 100644 index 00000000..ab5c7dd6 --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/LambdaRuntime+StateMachine.swift @@ -0,0 +1,295 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +extension NewLambdaRuntime { + struct Connection { + var channel: Channel + var handler: NewLambdaChannelHandler + } + + struct StateMachine { + enum Action { + case none + case createHandler(andConnection: Bool) + + case requestNextInvocation(NewLambdaChannelHandler, succeedStartPromise: EventLoopPromise?) + + case reportInvocationResult(LambdaRequestID, Result, pipelineNextInvocationRequest: Bool, NewLambdaChannelHandler) + case reportStartupError(Error, NewLambdaChannelHandler) + + case invokeHandler(Handler, Invocation, ByteBuffer) + + case failRuntime(Error, startPomise: EventLoopPromise?) + } + + private enum State { + case initialized + case starting(EventLoopPromise?) + case connected(Connection, EventLoopPromise?) + case handlerCreated(Handler, EventLoopPromise?) + case handlerCreationFailed(Error, EventLoopPromise?) + case reportingStartupError(Connection, Error, EventLoopPromise?) + + case waitingForInvocation(Connection, Handler) + case executingInvocation(Connection, Handler, LambdaRequestID) + case reportingInvocationResult(Connection, Handler, nextInvocationRequestPipelined: Bool) + + case failed(Error) + } + + private var markShutdown: Bool + private var state: State + + init() { + self.markShutdown = false + self.state = .initialized + } + + mutating func start(connection: Connection?, promise: EventLoopPromise?) -> Action { + switch self.state { + case .initialized: + if let connection = connection { + self.state = .connected(connection, promise) + return .createHandler(andConnection: false) + } + + self.state = .starting(promise) + return .createHandler(andConnection: true) + + case .starting, + .connected, + .handlerCreated, + .handlerCreationFailed, + .reportingStartupError, + .waitingForInvocation, + .executingInvocation, + .reportingInvocationResult, + .failed: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func handlerCreated(_ handler: Handler) -> Action { + switch self.state { + case .initialized, + .handlerCreated, + .handlerCreationFailed, + .waitingForInvocation, + .executingInvocation, + .reportingInvocationResult, + .reportingStartupError: + preconditionFailure("Invalid state: \(self.state)") + + case .starting(let promise): + self.state = .handlerCreated(handler, promise) + return .none + + case .connected(let connection, let promise): + self.state = .waitingForInvocation(connection, handler) + return .requestNextInvocation(connection.handler, succeedStartPromise: promise) + + case .failed: + return .none + } + } + + mutating func handlerCreationFailed(_ error: Error) -> Action { + switch self.state { + case .initialized, + .handlerCreated, + .handlerCreationFailed, + .waitingForInvocation, + .executingInvocation, + .reportingInvocationResult, + .reportingStartupError: + preconditionFailure("Invalid state: \(self.state)") + + case .starting(let promise): + self.state = .handlerCreationFailed(error, promise) + return .none + + case .connected(let connection, let promise): + self.state = .reportingStartupError(connection, error, promise) + return .reportStartupError(error, connection.handler) + + case .failed: + return .none + } + } + + mutating func httpConnectionCreated( + _ connection: Connection + ) -> Action { + switch self.state { + case .initialized, + .connected, + .waitingForInvocation, + .executingInvocation, + .reportingInvocationResult, + .reportingStartupError: + preconditionFailure("Invalid state: \(self.state)") + + case .starting(let promise): + self.state = .connected(connection, promise) + return .none + + case .handlerCreated(let handler, let promise): + self.state = .waitingForInvocation(connection, handler) + return .requestNextInvocation(connection.handler, succeedStartPromise: promise) + + case .handlerCreationFailed(let error, let promise): + self.state = .reportingStartupError(connection, error, promise) + return .reportStartupError(error, connection.handler) + + case .failed: + return .none + } + } + + mutating func httpChannelConnectFailed(_ error: Error) -> Action { + switch self.state { + case .initialized, + .connected, + .waitingForInvocation, + .executingInvocation, + .reportingInvocationResult, + .reportingStartupError: + preconditionFailure("Invalid state: \(self.state)") + + case .starting(let promise): + self.state = .failed(error) + return .failRuntime(error, startPomise: promise) + + case .handlerCreated(_, let promise): + self.state = .failed(error) + return .failRuntime(error, startPomise: promise) + + case .handlerCreationFailed(let error, let promise): + self.state = .failed(error) + return .failRuntime(error, startPomise: promise) + + case .failed: + return .none + } + } + + mutating func newInvocationReceived(_ invocation: Invocation, _ body: ByteBuffer) -> Action { + switch self.state { + case .initialized, + .starting, + .connected, + .handlerCreated, + .handlerCreationFailed, + .executingInvocation, + .reportingInvocationResult, + .reportingStartupError: + preconditionFailure("Invalid state: \(self.state)") + + case .waitingForInvocation(let connection, let handler): + self.state = .executingInvocation(connection, handler, LambdaRequestID(uuidString: invocation.requestID)!) + return .invokeHandler(handler, invocation, body) + + case .failed: + return .none + } + } + + mutating func acceptedReceived() -> Action { + switch self.state { + case .initialized, + .starting, + .connected, + .handlerCreated, + .handlerCreationFailed, + .executingInvocation: + preconditionFailure("Invalid state: \(self.state)") + + case .waitingForInvocation: + preconditionFailure("TODO: fixme") + + case .reportingStartupError(_, let error, let promise): + self.state = .failed(error) + return .failRuntime(error, startPomise: promise) + + case .reportingInvocationResult(let connection, let handler, true): + self.state = .waitingForInvocation(connection, handler) + return .none + + case .reportingInvocationResult(let connection, let handler, false): + self.state = .waitingForInvocation(connection, handler) + return .requestNextInvocation(connection.handler, succeedStartPromise: nil) + + case .failed: + return .none + } + } + + mutating func errorResponseReceived(_ errorResponse: ErrorResponse) -> Action { + switch self.state { + case .initialized, + .starting, + .connected, + .handlerCreated, + .handlerCreationFailed, + .executingInvocation: + preconditionFailure("Invalid state: \(self.state)") + + case .waitingForInvocation: + let error = LambdaRuntimeError.controlPlaneErrorResponse(errorResponse) + self.state = .failed(error) + return .failRuntime(error, startPomise: nil) + + case .reportingStartupError(_, let error, let promise): + self.state = .failed(error) + return .failRuntime(error, startPomise: promise) + + case .reportingInvocationResult: + let error = LambdaRuntimeError.controlPlaneErrorResponse(errorResponse) + self.state = .failed(error) + return .failRuntime(error, startPomise: nil) + + case .failed: + return .none + } + } + + mutating func handlerError(_: Error) {} + + mutating func channelInactive() {} + + mutating func invocationFinished(_ result: Result) -> Action { + switch self.state { + case .initialized, + .starting, + .handlerCreated, + .handlerCreationFailed, + .connected, + .waitingForInvocation, + .reportingStartupError, + .reportingInvocationResult: + preconditionFailure("Invalid state: \(self.state)") + + case .failed: + return .none + + case .executingInvocation(let connection, let handler, let requestID): + let pipelining = true + self.state = .reportingInvocationResult(connection, handler, nextInvocationRequestPipelined: pipelining) + return .reportInvocationResult(requestID, result, pipelineNextInvocationRequest: pipelining, connection.handler) + } + } + } +} diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeError.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeError.swift new file mode 100644 index 00000000..4c91c90a --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeError.swift @@ -0,0 +1,69 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +struct LambdaRuntimeError: Error, Hashable { + enum Base: Hashable { + case unsolicitedResponse + case unexpectedStatusCode + + case responseHeadInvalidStatusLine + case responseHeadMissingContentLengthOrTransferEncodingChunked + case responseHeadMoreThan256BytesBeforeCRLF + case responseHeadHeaderInvalidCharacter + case responseHeadHeaderMissingColon + case responseHeadHeaderMissingFieldValue + case responseHeadInvalidHeader + case responseHeadInvalidContentLengthValue + case responseHeadInvalidRequestIDValue + case responseHeadInvalidTraceIDValue + case responseHeadInvalidDeadlineValue + + case invocationHeadMissingRequestID + case invocationHeadMissingDeadlineInMillisSinceEpoch + case invocationHeadMissingFunctionARN + case invocationHeadMissingTraceID + + case controlPlaneErrorResponse(ErrorResponse) + } + + private let base: Base + + private init(_ base: Base) { + self.base = base + } + + static var unsolicitedResponse = LambdaRuntimeError(.unsolicitedResponse) + static var unexpectedStatusCode = LambdaRuntimeError(.unexpectedStatusCode) + static var responseHeadInvalidStatusLine = LambdaRuntimeError(.responseHeadInvalidStatusLine) + static var responseHeadMissingContentLengthOrTransferEncodingChunked = + LambdaRuntimeError(.responseHeadMissingContentLengthOrTransferEncodingChunked) + static var responseHeadMoreThan256BytesBeforeCRLF = LambdaRuntimeError(.responseHeadMoreThan256BytesBeforeCRLF) + static var responseHeadHeaderInvalidCharacter = LambdaRuntimeError(.responseHeadHeaderInvalidCharacter) + static var responseHeadHeaderMissingColon = LambdaRuntimeError(.responseHeadHeaderMissingColon) + static var responseHeadHeaderMissingFieldValue = LambdaRuntimeError(.responseHeadHeaderMissingFieldValue) + static var responseHeadInvalidHeader = LambdaRuntimeError(.responseHeadInvalidHeader) + static var responseHeadInvalidContentLengthValue = LambdaRuntimeError(.responseHeadInvalidContentLengthValue) + static var responseHeadInvalidRequestIDValue = LambdaRuntimeError(.responseHeadInvalidRequestIDValue) + static var responseHeadInvalidTraceIDValue = LambdaRuntimeError(.responseHeadInvalidTraceIDValue) + static var responseHeadInvalidDeadlineValue = LambdaRuntimeError(.responseHeadInvalidDeadlineValue) + + static var invocationHeadMissingRequestID = LambdaRuntimeError(.invocationHeadMissingRequestID) + static var invocationHeadMissingDeadlineInMillisSinceEpoch = LambdaRuntimeError(.invocationHeadMissingDeadlineInMillisSinceEpoch) + static var invocationHeadMissingFunctionARN = LambdaRuntimeError(.invocationHeadMissingFunctionARN) + static var invocationHeadMissingTraceID = LambdaRuntimeError(.invocationHeadMissingTraceID) + + static func controlPlaneErrorResponse(_ response: ErrorResponse) -> Self { + LambdaRuntimeError(.controlPlaneErrorResponse(response)) + } +} diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaChannelHandler.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaChannelHandler.swift new file mode 100644 index 00000000..128ec5cc --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaChannelHandler.swift @@ -0,0 +1,82 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +protocol LambdaChannelHandlerDelegate { + func responseReceived(_: ControlPlaneResponse) + + func errorCaught(_: Error) + + func channelInactive() +} + +final class NewLambdaChannelHandler: ChannelInboundHandler { + typealias InboundIn = ByteBuffer + typealias OutboundOut = ByteBuffer + + private let delegate: Delegate + private var requestsInFlight: CircularBuffer + + private var context: ChannelHandlerContext! + + private var encoder: ControlPlaneRequestEncoder + private var decoder: NIOSingleStepByteToMessageProcessor + + init(delegate: Delegate, host: String) { + self.delegate = delegate + self.requestsInFlight = CircularBuffer(initialCapacity: 4) + + self.encoder = ControlPlaneRequestEncoder(host: host) + self.decoder = NIOSingleStepByteToMessageProcessor(ControlPlaneResponseDecoder(), maximumBufferSize: 7 * 1024 * 1024) + } + + func sendRequest(_ request: ControlPlaneRequest) { + self.requestsInFlight.append(request) + self.encoder.writeRequest(request, context: self.context, promise: nil) + } + + func handlerAdded(context: ChannelHandlerContext) { + self.context = context + self.encoder.writerAdded(context: context) + } + + func handlerRemoved(context: ChannelHandlerContext) { + self.context = context + self.encoder.writerRemoved(context: context) + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + do { + let buffer = self.unwrapInboundIn(data) + try self.decoder.process(buffer: buffer) { response in + guard self.requestsInFlight.popFirst() != nil else { + throw LambdaRuntimeError.unsolicitedResponse + } + + self.delegate.responseReceived(response) + } + } catch { + self.delegate.errorCaught(error) + } + } + + func channelInactive(context: ChannelHandlerContext) { + self.delegate.channelInactive() + } + + func errorCaught(context: ChannelHandlerContext, error: Error) { + self.delegate.errorCaught(error) + } +} diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift new file mode 100644 index 00000000..4f84808f --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift @@ -0,0 +1,305 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Backtrace +import Logging +import NIOConcurrencyHelpers +import NIOCore +import NIOPosix + +#if canImport(Glibc) +import Glibc +#endif + +/// `LambdaRuntime` manages the Lambda process lifecycle. +/// +/// - note: All state changes are dispatched onto the supplied EventLoop. +public final class NewLambdaRuntime { + private let eventLoop: EventLoop + private let shutdownPromise: EventLoopPromise + private let logger: Logger + private let configuration: Lambda.Configuration + + private var state: StateMachine + + init(eventLoop: EventLoop, + logger: Logger, + configuration: Lambda.Configuration, + handlerType: Handler.Type) { + self.state = StateMachine() + self.eventLoop = eventLoop + self.shutdownPromise = eventLoop.makePromise(of: Void.self) + self.logger = logger + self.configuration = configuration + } + + deinit { + // TODO: Verify is shutdown + } + + /// The `Lifecycle` shutdown future. + /// + /// - Returns: An `EventLoopFuture` that is fulfilled after the Lambda lifecycle has fully shutdown. + public var shutdownFuture: EventLoopFuture { + self.shutdownPromise.futureResult + } + + /// Start the `LambdaRuntime`. + /// + /// - Returns: An `EventLoopFuture` that is fulfilled after the Lambda hander has been created + /// and initiliazed, and a first run has been scheduled. + public func start() -> EventLoopFuture { + let promise = self.eventLoop.makePromise(of: Void.self) + self.start(promise: promise) + return promise.futureResult + } + + public func start(promise: EventLoopPromise?) { + if self.eventLoop.inEventLoop { + self.start0(promise: promise) + } else { + self.eventLoop.execute { + self.start0(promise: promise) + } + } + } + + public func __testOnly_start(channel: Channel, promise: EventLoopPromise?) { + precondition(channel.eventLoop === self.eventLoop, "Channel must be created on the supplied EventLoop.") + if self.eventLoop.inEventLoop { + self.__testOnly_start0(channel: channel, promise: promise) + } else { + self.eventLoop.execute { + self.__testOnly_start0(channel: channel, promise: promise) + } + } + } + + /// Begin the `LambdaRuntime` shutdown. Only needed for debugging purposes, hence behind a `DEBUG` flag. + public func shutdown(promise: EventLoopPromise?) { + if self.eventLoop.inEventLoop { + self.shutdown0(promise: promise) + } else { + self.eventLoop.execute { + self.shutdown0(promise: promise) + } + } + } + + // MARK: - Private - + + private func start0(promise: EventLoopPromise?) { + self.eventLoop.assertInEventLoop() + + // when starting we want to do thing in parallel: + // 1. start the connection to the control plane + // 2. create the lambda handler + + self.logger.debug("initializing lambda") + + let action = self.state.start(connection: nil, promise: promise) + self.run(action) + } + + private func shutdown0(promise: EventLoopPromise?) {} + + private func __testOnly_start0(channel: Channel, promise: EventLoopPromise?) { + channel.eventLoop.preconditionInEventLoop() + assert(channel.isActive) + + do { + let connection = try self.setupConnection(channel: channel) + let action = self.state.start(connection: connection, promise: promise) + self.run(action) + } catch { + promise?.fail(error) + } + } + + private func run(_ action: StateMachine.Action) { + switch action { + case .createHandler(andConnection: let andConnection): + self.createHandler() + if andConnection { + self.createConnection() + } + + case .invokeHandler(let handler, let invocation, let event): + self.logger.trace("invoking handler") + let context = LambdaContext( + logger: self.logger, + eventLoop: self.eventLoop, + allocator: .init(), + invocation: invocation + ) + handler.handle(event, context: context).whenComplete { result in + let action = self.state.invocationFinished(result) + self.run(action) + } + + case .failRuntime(let error, let startPromise): + startPromise?.fail(error) + self.shutdownPromise.fail(error) + + case .requestNextInvocation(let handler, let startPromise): + self.logger.trace("requesting next invocation") + handler.sendRequest(.next) + startPromise?.succeed(()) + + case .reportInvocationResult(let requestID, let result, let pipelineNextInvocationRequest, let handler): + switch result { + case .success(let body): + self.logger.trace("reporting invocation success", metadata: [ + "lambda-request-id": "\(requestID)", + ]) + handler.sendRequest(.invocationResponse(requestID, body)) + + case .failure(let error): + self.logger.trace("reporting invocation failure", metadata: [ + "lambda-request-id": "\(requestID)", + ]) + let errorString = String(describing: error) + let errorResponse = ErrorResponse(errorType: errorString, errorMessage: errorString) + handler.sendRequest(.invocationError(requestID, errorResponse)) + } + + if pipelineNextInvocationRequest { + handler.sendRequest(.next) + } + + case .reportStartupError(let error, let handler): + let errorString = String(describing: error) + handler.sendRequest(.initializationError(.init(errorType: errorString, errorMessage: errorString))) + + case .none: + break + } + } + + private func createConnection() { + let connectFuture = ClientBootstrap(group: self.eventLoop).connect( + host: self.configuration.runtimeEngine.ip, + port: self.configuration.runtimeEngine.port + ) + + connectFuture.whenComplete { result in + let action: StateMachine.Action + switch result { + case .success(let channel): + do { + let connection = try self.setupConnection(channel: channel) + action = self.state.httpConnectionCreated(connection) + } catch { + action = self.state.httpChannelConnectFailed(error) + } + case .failure(let error): + action = self.state.httpChannelConnectFailed(error) + } + self.run(action) + } + } + + private func setupConnection(channel: Channel) throws -> Connection { + let handler = NewLambdaChannelHandler(delegate: self, host: self.configuration.runtimeEngine.ip) + try channel.pipeline.syncOperations.addHandler(handler) + return Connection(channel: channel, handler: handler) + } + + private func createHandler() { + let context = Lambda.InitializationContext( + logger: self.logger, + eventLoop: self.eventLoop, + allocator: ByteBufferAllocator() + ) + + Handler.makeHandler(context: context).hop(to: self.eventLoop).whenComplete { result in + let action: StateMachine.Action + switch result { + case .success(let handler): + action = self.state.handlerCreated(handler) + case .failure(let error): + action = self.state.handlerCreationFailed(error) + } + self.run(action) + } + } +} + +extension NewLambdaRuntime: LambdaChannelHandlerDelegate { + func responseReceived(_ response: ControlPlaneResponse) { + let action: StateMachine.Action + switch response { + case .next(let invocation, let byteBuffer): + action = self.state.newInvocationReceived(invocation, byteBuffer) + + case .accepted: + action = self.state.acceptedReceived() + + case .error(let errorResponse): + action = self.state.errorResponseReceived(errorResponse) + } + + self.run(action) + } + + func errorCaught(_ error: Error) { + self.state.handlerError(error) + } + + func channelInactive() { + self.state.channelInactive() + } +} + +extension NewLambdaRuntime { + static func run(handlerType: Handler.Type) { + Backtrace.install() + + let configuration = Lambda.Configuration() + var logger = Logger(label: "Lambda") + logger.logLevel = configuration.general.logLevel + + MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { eventLoop in + let runtime = NewLambdaRuntime( + eventLoop: eventLoop, + logger: logger, + configuration: configuration, + handlerType: Handler.self + ) + + logger.info("lambda runtime starting with \(configuration)") + + #if DEBUG + let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in + logger.info("intercepted signal: \(signal)") + runtime.shutdown(promise: nil) + } + #endif + + runtime.start().flatMap { + runtime.shutdownFuture + }.whenComplete { _ in + #if DEBUG + signalSource.cancel() + #endif + eventLoop.shutdownGracefully { error in + if let error = error { + preconditionFailure("Failed to shutdown eventloop: \(error)") + } + logger.info("shutdown completed") + } + } + } + } +} diff --git a/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneRequestEncoderTests.swift b/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneRequestEncoderTests.swift index ac6c0838..2d10094b 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneRequestEncoderTests.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneRequestEncoderTests.swift @@ -54,7 +54,7 @@ final class ControlPlaneRequestEncoderTests: XCTestCase { } func testPostInvocationSuccessWithoutBody() { - let requestID = UUID().uuidString + let requestID = LambdaRequestID() var request: NIOHTTPServerRequestFull? XCTAssertNoThrow(request = try self.sendRequest(.invocationResponse(requestID, nil))) @@ -70,7 +70,7 @@ final class ControlPlaneRequestEncoderTests: XCTestCase { } func testPostInvocationSuccessWithBody() { - let requestID = UUID().uuidString + let requestID = LambdaRequestID() let payload = ByteBuffer(string: "hello swift lambda!") var request: NIOHTTPServerRequestFull? @@ -89,7 +89,7 @@ final class ControlPlaneRequestEncoderTests: XCTestCase { } func testPostInvocationErrorWithBody() { - let requestID = UUID().uuidString + let requestID = LambdaRequestID() let error = ErrorResponse(errorType: "SomeError", errorMessage: "An error happened") var request: NIOHTTPServerRequestFull? XCTAssertNoThrow(request = try self.sendRequest(.invocationError(requestID, error))) @@ -137,7 +137,7 @@ final class ControlPlaneRequestEncoderTests: XCTestCase { XCTAssertEqual(nextRequest?.head.method, .GET) XCTAssertEqual(nextRequest?.head.uri, "/2018-06-01/runtime/invocation/next") - let requestID = UUID().uuidString + let requestID = LambdaRequestID() let payload = ByteBuffer(string: "hello swift lambda!") var successRequest: NIOHTTPServerRequestFull? XCTAssertNoThrow(successRequest = try self.sendRequest(.invocationResponse(requestID, payload))) diff --git a/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneResponseDecoderTests.swift b/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneResponseDecoderTests.swift new file mode 100644 index 00000000..27cea84c --- /dev/null +++ b/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneResponseDecoderTests.swift @@ -0,0 +1,344 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AWSLambdaRuntimeCore +import NIOCore +import NIOTestUtils +import XCTest + +final class ControlPlaneResponseDecoderTests: XCTestCase { + func testNextAndAcceptedResponse() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: application/json\r\n\ + Lambda-Runtime-Aws-Request-Id: 9028dc49-a01b-4b44-8ffe-4912e9dabbbd\r\n\ + Lambda-Runtime-Deadline-Ms: 1638392696671\r\n\ + Lambda-Runtime-Invoked-Function-Arn: arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x\r\n\ + Lambda-Runtime-Trace-Id: Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + Content-Length: 49\r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + let invocation = Invocation( + requestID: "9028dc49-a01b-4b44-8ffe-4912e9dabbbd", + deadlineInMillisSinceEpoch: 1_638_392_696_671, + invokedFunctionARN: "arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x", + traceID: "Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0", + clientContext: nil, + cognitoIdentity: nil + ) + let next: ControlPlaneResponse = .next(invocation, ByteBuffer(string: #"{"name":"Fabian","key2":"value2","key3":"value3"}"#)) + + let acceptedResponse = ByteBuffer(string: """ + HTTP/1.1 202 Accepted\r\n\ + Content-Type: application/json\r\n\ + Date: Sun, 05 Dec 2021 11:53:40 GMT\r\n\ + Content-Length: 16\r\n\ + \r\n\ + {"status":"OK"}\n + """ + ) + + let pairs: [(ByteBuffer, [ControlPlaneResponse])] = [ + (nextResponse, [next]), + (acceptedResponse, [.accepted]), + (nextResponse + acceptedResponse, [next, .accepted]), + ] + + XCTAssertNoThrow(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: pairs, + decoderFactory: { ControlPlaneResponseDecoder() } + )) + } + + func testWhitespaceInHeaderIsRejected() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: application/json\r\n\ + Lambda-Runtime Aws-Request-Id: 9028dc49-a01b-4b44-8ffe-4912e9dabbbd\r\n\ + Lambda-Runtime-Deadline-Ms: 1638392696671\r\n\ + Lambda-Runtime-Invoked-Function-Arn: arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x\r\n\ + Lambda-Runtime-Trace-Id: Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + Content-Length: 49\r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadHeaderInvalidCharacter) + } + } + + func testVeryLongHTTPStatusLine() { + let nextResponse = ByteBuffer(repeating: UInt8(ascii: "H"), count: 1024) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadMoreThan256BytesBeforeCRLF) + } + } + + func testVeryLongHTTPHeader() { + let acceptedResponse = ByteBuffer(string: """ + HTTP/1.1 202 Accepted\r\n\ + Content-Type: application/json\r\n\ + Date: Sun, 05 Dec 2021 11:53:40 GMT\r\n\ + Content-Length: 16\r\n + """ + ) + ByteBuffer(repeating: UInt8(ascii: "H"), count: 1024) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(acceptedResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadMoreThan256BytesBeforeCRLF) + } + } + + func testNextResponseWithoutTraceID() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: application/json\r\n\ + Lambda-Runtime-Aws-Request-Id: 9028dc49-a01b-4b44-8ffe-4912e9dabbbd\r\n\ + Lambda-Runtime-Deadline-Ms: 1638392696671\r\n\ + Lambda-Runtime-Invoked-Function-Arn: arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + Content-Length: 49\r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .invocationHeadMissingTraceID) + } + } + + func testNextResponseWithoutRequestID() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: application/json\r\n\ + Lambda-Runtime-Deadline-Ms: 1638392696671\r\n\ + Lambda-Runtime-Invoked-Function-Arn: arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x\r\n\ + Lambda-Runtime-Trace-Id: Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + Content-Length: 49\r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .invocationHeadMissingRequestID) + } + } + + func testNextResponseWithInvalidStatusCode() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 20 OK\r\n\ + Content-Type: application/json\r\n\ + Lambda-Runtime-Deadline-Ms: 1638392696671\r\n\ + Lambda-Runtime-Invoked-Function-Arn: arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x\r\n\ + Lambda-Runtime-Trace-Id: Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + Content-Length: 49\r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadInvalidStatusLine) + } + } + + func testNextResponseWithVersionHTTP2() { + let nextResponse = ByteBuffer(string: """ + HTTP/2.0 200 OK\r\n\ + Content-Type: application/json\r\n\ + Lambda-Runtime-Deadline-Ms: 1638392696671\r\n\ + Lambda-Runtime-Invoked-Function-Arn: arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x\r\n\ + Lambda-Runtime-Trace-Id: Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + Content-Length: 49\r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadInvalidStatusLine) + } + } + + func testNextResponseLeadingAndTrailingWhitespaceHeaders() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: \t \t application/json\t \t \r\n\ + Lambda-Runtime-Aws-Request-Id: \t \t 9028dc49-a01b-4b44-8ffe-4912e9dabbbd\t \t \r\n\ + Lambda-Runtime-Deadline-Ms: \t \t 1638392696671\t \t \r\n\ + Lambda-Runtime-Invoked-Function-Arn: \t \t arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x\t \t \r\n\ + Lambda-Runtime-Trace-Id: \t \t Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0\t \t \r\n\ + Date: \t \t Wed, 01 Dec 2021 21:04:53 GMT\t \t \r\n\ + Content-Length: \t \t 49\t \t \r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + let invocation = Invocation( + requestID: "9028dc49-a01b-4b44-8ffe-4912e9dabbbd", + deadlineInMillisSinceEpoch: 1_638_392_696_671, + invokedFunctionARN: "arn:aws:lambda:eu-central-1:000000000000:function:lambda-log-http-HelloWorldLambda-NiDlzMFXtF3x", + traceID: "Root=1-61a7e375-40b3edf95b388fe75d1fa416;Parent=348bb48e251c1254;Sampled=0", + clientContext: nil, + cognitoIdentity: nil + ) + let next: ControlPlaneResponse = .next(invocation, ByteBuffer(string: #"{"name":"Fabian","key2":"value2","key3":"value3"}"#)) + + XCTAssertNoThrow(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [next])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) + } + + func testContentLengthHasTrailingCharacterSurroundedByWhitespace() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: \t \t application/json\t \t \r\n\ + Content-Length: 49 r \r\n\ + Date: \t \t Wed, 01 Dec 2021 21:04:53 GMT\t \t \r\n\ + + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadInvalidContentLengthValue) + } + } + + func testInvalidContentLength() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: \t \t application/json\t \t \r\n\ + Content-Length: 4u9 \r\n\ + Date: \t \t Wed, 01 Dec 2021 21:04:53 GMT\t \t \r\n\ + + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadInvalidContentLengthValue) + } + } + + func testResponseHeaderWithoutColon() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type application/json\r\n\ + Content-Length: 49\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadHeaderMissingColon) + } + } + + func testResponseHeaderWithDoubleCR() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: application/json\r\r\n\ + Content-Length: 49\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + XCTAssertEqual($0 as? LambdaRuntimeError, .responseHeadInvalidHeader) + } + } + + func testResponseHeaderWithoutValue() { + let nextResponse = ByteBuffer(string: """ + HTTP/1.1 200 OK\r\n\ + Content-Type: \r\n\ + Content-Length: 49\r\n\ + Date: Wed, 01 Dec 2021 21:04:53 GMT\r\n\ + \r\n\ + {"name":"Fabian","key2":"value2","key3":"value3"} + """ + ) + + XCTAssertThrowsError(try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [(nextResponse, [])], + decoderFactory: { ControlPlaneResponseDecoder() } + )) { + // TODO: This should return an invalid header function error (.responseHeadInvalidHeader) + XCTAssertEqual($0 as? LambdaRuntimeError, .invocationHeadMissingRequestID) + } + } +} + +extension ByteBuffer { + static func + (lhs: Self, rhs: Self) -> ByteBuffer { + var new = lhs + var rhs = rhs + new.writeBuffer(&rhs) + return new + } +} diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaRequestIDTests.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaRequestIDTests.swift index 7849fe09..e3d89dae 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaRequestIDTests.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaRequestIDTests.swift @@ -24,7 +24,7 @@ final class LambdaRequestIDTest: XCTestCase { let requestID = buffer.readRequestID() XCTAssertEqual(buffer.readerIndex, 36) XCTAssertEqual(buffer.readableBytes, 0) - XCTAssertEqual(requestID?.uuidString, UUID(uuidString: string)?.uuidString) + XCTAssertEqual(requestID?.uppercased, UUID(uuidString: string)?.uuidString) XCTAssertEqual(requestID?.uppercased, string) } @@ -35,7 +35,7 @@ final class LambdaRequestIDTest: XCTestCase { let requestID = originalBuffer.readRequestID() XCTAssertEqual(originalBuffer.readerIndex, 36) XCTAssertEqual(originalBuffer.readableBytes, 0) - XCTAssertEqual(requestID?.uuidString, UUID(uuidString: string)?.uuidString) + XCTAssertEqual(requestID?.uppercased, UUID(uuidString: string)?.uuidString) XCTAssertEqual(requestID?.lowercased, string) var newBuffer = ByteBuffer() @@ -109,7 +109,7 @@ final class LambdaRequestIDTest: XCTestCase { // achieve this though at the moment // XCTAssertFalse((nsString as String).isContiguousUTF8) let requestID = LambdaRequestID(uuidString: nsString as String) - XCTAssertEqual(requestID?.uuidString, LambdaRequestID(uuidString: nsString as String)?.uuidString) + XCTAssertEqual(requestID?.lowercased, LambdaRequestID(uuidString: nsString as String)?.lowercased) XCTAssertEqual(requestID?.uppercased, nsString as String) } @@ -121,10 +121,10 @@ final class LambdaRequestIDTest: XCTestCase { func testDescription() { let requestID = LambdaRequestID() - let fduuid = UUID(uuid: requestID.uuid) + let uuid = UUID(uuid: requestID.uuid) - XCTAssertEqual(fduuid.description, requestID.description) - XCTAssertEqual(fduuid.debugDescription, requestID.debugDescription) + XCTAssertEqual(uuid.description.lowercased(), requestID.description) + XCTAssertEqual(uuid.debugDescription.lowercased(), requestID.debugDescription) } func testFoundationInteropFromFoundation() { @@ -190,7 +190,7 @@ final class LambdaRequestIDTest: XCTestCase { var data: Data? XCTAssertNoThrow(data = try JSONEncoder().encode(test)) - XCTAssertEqual(try String(decoding: XCTUnwrap(data), as: Unicode.UTF8.self), #"{"requestID":"\#(requestID.uuidString)"}"#) + XCTAssertEqual(try String(decoding: XCTUnwrap(data), as: Unicode.UTF8.self), #"{"requestID":"\#(requestID.lowercased)"}"#) } func testDecodingSuccess() { @@ -198,7 +198,7 @@ final class LambdaRequestIDTest: XCTestCase { let requestID: LambdaRequestID } let requestID = LambdaRequestID() - let data = #"{"requestID":"\#(requestID.uuidString)"}"#.data(using: .utf8) + let data = #"{"requestID":"\#(requestID.lowercased)"}"#.data(using: .utf8) var result: Test? XCTAssertNoThrow(result = try JSONDecoder().decode(Test.self, from: XCTUnwrap(data))) @@ -210,7 +210,7 @@ final class LambdaRequestIDTest: XCTestCase { let requestID: LambdaRequestID } let requestID = LambdaRequestID() - var requestIDString = requestID.uuidString + var requestIDString = requestID.lowercased _ = requestIDString.removeLast() let data = #"{"requestID":"\#(requestIDString)"}"#.data(using: .utf8) diff --git a/Tests/AWSLambdaRuntimeCoreTests/NewLambdaChannelHandlerTests.swift b/Tests/AWSLambdaRuntimeCoreTests/NewLambdaChannelHandlerTests.swift new file mode 100644 index 00000000..f2c9fc33 --- /dev/null +++ b/Tests/AWSLambdaRuntimeCoreTests/NewLambdaChannelHandlerTests.swift @@ -0,0 +1,212 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AWSLambdaRuntimeCore +import NIOCore +import NIOEmbedded +import NIOHTTP1 +import XCTest + +final class NewLambdaChannelHandlerTests: XCTestCase { + let host = "192.168.0.1" + + var delegate: EmbeddedLambdaChannelHandlerDelegate! + var handler: NewLambdaChannelHandler! + var client: EmbeddedChannel! + var server: EmbeddedChannel! + + override func setUp() { + self.delegate = EmbeddedLambdaChannelHandlerDelegate() + self.handler = NewLambdaChannelHandler(delegate: self.delegate, host: "127.0.0.1") + + self.client = EmbeddedChannel(handler: self.handler) + self.server = EmbeddedChannel(handlers: [ + NIOHTTPServerRequestAggregator(maxContentLength: 1024 * 1024), + ]) + + XCTAssertNoThrow(try self.server.pipeline.syncOperations.configureHTTPServerPipeline(position: .first)) + + XCTAssertNoThrow(try self.server.bind(to: .init(ipAddress: "127.0.0.1", port: 0), promise: nil)) + XCTAssertNoThrow(try self.client.connect(to: .init(ipAddress: "127.0.0.1", port: 0), promise: nil)) + } + + func testPipelineRequests() { + self.handler.sendRequest(.next) + + self.assertInteract() + + var nextRequest: NIOHTTPServerRequestFull? + XCTAssertNoThrow(nextRequest = try self.server.readInbound(as: NIOHTTPServerRequestFull.self)) + XCTAssertEqual(nextRequest?.head.uri, "/2018-06-01/runtime/invocation/next") + XCTAssertEqual(nextRequest?.head.method, .GET) + + XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self)) + + let requestID = LambdaRequestID() + let traceID = "foo" + let functionARN = "arn" + let deadline = UInt(Date().timeIntervalSince1970 * 1000) + 3000 + let requestBody = ByteBuffer(string: "foo bar") + + XCTAssertNoThrow(try self.server.writeOutboundInvocation( + requestID: requestID, + traceID: traceID, + functionARN: functionARN, + deadline: deadline, + body: requestBody + )) + + self.assertInteract() + + var response: (Invocation, ByteBuffer)? + XCTAssertNoThrow(response = try self.delegate.readNextResponse()) + + XCTAssertEqual(response?.0.requestID, requestID.lowercased) + XCTAssertEqual(response?.0.traceID, traceID) + XCTAssertEqual(response?.0.invokedFunctionARN, functionARN) + XCTAssertEqual(response?.0.deadlineInMillisSinceEpoch, Int64(deadline)) + XCTAssertEqual(response?.1, requestBody) + + let responseBody = ByteBuffer(string: "hello world") + + self.handler.sendRequest(.invocationResponse(requestID, responseBody)) + + self.assertInteract() + + var responseRequest: NIOHTTPServerRequestFull? + XCTAssertNoThrow(responseRequest = try self.server.readInbound(as: NIOHTTPServerRequestFull.self)) + XCTAssertEqual(responseRequest?.head.uri, "/2018-06-01/runtime/invocation/\(requestID.lowercased)/response") + XCTAssertEqual(responseRequest?.head.method, .POST) + XCTAssertEqual(responseRequest?.body, responseBody) + } + + func assertInteract(file: StaticString = #file, line: UInt = #line) { + XCTAssertNoThrow(try { + while let clientBuffer = try self.client.readOutbound(as: ByteBuffer.self) { + try self.server.writeInbound(clientBuffer) + } + + while let serverBuffer = try self.server.readOutbound(as: ByteBuffer.self) { + try self.client.writeInbound(serverBuffer) + } + }(), file: file, line: line) + } +} + +final class EmbeddedLambdaChannelHandlerDelegate: LambdaChannelHandlerDelegate { + enum Error: Swift.Error { + case missingEvent + case wrongEventType + case wrongResponseType + } + + private enum Event { + case channelInactive + case error(Swift.Error) + case response(ControlPlaneResponse) + } + + private var events: CircularBuffer + + init() { + self.events = CircularBuffer(initialCapacity: 8) + } + + func channelInactive() { + self.events.append(.channelInactive) + } + + func errorCaught(_ error: Swift.Error) { + self.events.append(.error(error)) + } + + func responseReceived(_ response: ControlPlaneResponse) { + self.events.append(.response(response)) + } + + func readResponse() throws -> ControlPlaneResponse { + guard case .response(let response) = try self.popNextEvent() else { + throw Error.wrongEventType + } + return response + } + + func readNextResponse() throws -> (Invocation, ByteBuffer) { + guard case .next(let invocation, let body) = try self.readResponse() else { + throw Error.wrongResponseType + } + return (invocation, body) + } + + func assertAcceptResponse() throws { + guard case .accepted = try self.readResponse() else { + throw Error.wrongResponseType + } + } + + func readErrorResponse() throws -> ErrorResponse { + guard case .error(let errorResponse) = try self.readResponse() else { + throw Error.wrongResponseType + } + return errorResponse + } + + func readError() throws -> Swift.Error { + guard case .error(let error) = try self.popNextEvent() else { + throw Error.wrongEventType + } + return error + } + + func assertChannelInactive() throws { + guard case .channelInactive = try self.popNextEvent() else { + throw Error.wrongEventType + } + } + + private func popNextEvent() throws -> Event { + guard let event = self.events.popFirst() else { + throw Error.missingEvent + } + return event + } +} + +extension EmbeddedChannel { + func writeOutboundInvocation( + requestID: LambdaRequestID = LambdaRequestID(), + traceID: String = "Root=\(DispatchTime.now().uptimeNanoseconds);Parent=\(DispatchTime.now().uptimeNanoseconds);Sampled=1", + functionARN: String = "", + deadline: UInt = UInt(Date().timeIntervalSince1970 * 1000) + 3000, + body: ByteBuffer? + ) throws { + let head = HTTPResponseHead( + version: .http1_1, + status: .ok, + headers: [ + "content-length": "\(body?.readableBytes ?? 0)", + "lambda-runtime-deadline-ms": "\(deadline)", + "lambda-runtime-trace-id": "\(traceID)", + "lambda-runtime-aws-request-id": "\(requestID)", + "lambda-runtime-invoked-function-arn": "\(functionARN)", + ] + ) + + try self.writeOutbound(HTTPServerResponsePart.head(head)) + if let body = body { + try self.writeOutbound(HTTPServerResponsePart.body(.byteBuffer(body))) + } + try self.writeOutbound(HTTPServerResponsePart.end(nil)) + } +} diff --git a/scripts/soundness.sh b/scripts/soundness.sh index d9145903..603ab19a 100755 --- a/scripts/soundness.sh +++ b/scripts/soundness.sh @@ -19,7 +19,7 @@ here="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" function replace_acceptable_years() { # this needs to replace all acceptable forms with 'YEARS' - sed -e 's/2017-2018/YEARS/' -e 's/2017-2020/YEARS/' -e 's/2017-2021/YEARS/' -e 's/2020-2021/YEARS/' -e 's/2019/YEARS/' -e 's/2020/YEARS/' -e 's/2021/YEARS/' -e 's/2022/YEARS/' + sed -e 's/2017-2018/YEARS/' -e 's/2017-2020/YEARS/' -e 's/2017-2021/YEARS/' -e 's/2020-2021/YEARS/' -e 's/2021-2022/YEARS/' -e 's/2019/YEARS/' -e 's/2020/YEARS/' -e 's/2021/YEARS/' -e 's/2022/YEARS/' } printf "=> Checking for unacceptable language... "