diff --git a/Sources/GRPCCore/Internal/Base64.swift b/Sources/GRPCCore/Internal/Base64.swift index 6265b996b..f2078331f 100644 --- a/Sources/GRPCCore/Internal/Base64.swift +++ b/Sources/GRPCCore/Internal/Base64.swift @@ -26,11 +26,11 @@ met: 1. Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. + notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED @@ -70,10 +70,9 @@ SOFTWARE. */ -internal enum Base64 {} - -extension Base64 { - internal struct DecodingOptions: OptionSet { +// swift-format-ignore: DontRepeatTypeInStaticProperties +enum Base64 { + struct DecodingOptions: OptionSet { internal let rawValue: UInt internal init(rawValue: UInt) { self.rawValue = rawValue } @@ -81,14 +80,49 @@ extension Base64 { internal static let omitPaddingCharacter = DecodingOptions(rawValue: UInt(1 << 1)) } - internal enum DecodingError: Error, Equatable { + enum DecodingError: Error, Equatable { case invalidLength case invalidCharacter(UInt8) case unexpectedPaddingCharacter case unexpectedEnd } - internal static func decode( + static func encode(bytes: Buffer) -> String where Buffer.Element == UInt8 { + guard !bytes.isEmpty else { + return "" + } + + // In Base64, 3 bytes become 4 output characters, and we pad to the + // nearest multiple of four. + let base64StringLength = ((bytes.count + 2) / 3) * 4 + let alphabet = Base64.encodeBase64 + + return String(customUnsafeUninitializedCapacity: base64StringLength) { backingStorage in + var input = bytes.makeIterator() + var offset = 0 + while let firstByte = input.next() { + let secondByte = input.next() + let thirdByte = input.next() + + backingStorage[offset] = Base64.encode(alphabet: alphabet, firstByte: firstByte) + backingStorage[offset + 1] = Base64.encode( + alphabet: alphabet, + firstByte: firstByte, + secondByte: secondByte + ) + backingStorage[offset + 2] = Base64.encode( + alphabet: alphabet, + secondByte: secondByte, + thirdByte: thirdByte + ) + backingStorage[offset + 3] = Base64.encode(alphabet: alphabet, thirdByte: thirdByte) + offset += 4 + } + return offset + } + } + + static func decode( string encoded: String, options: DecodingOptions = [] ) throws -> [UInt8] { @@ -204,7 +238,7 @@ extension Base64 { } } - static func withUnsafeDecodingTablesAsBufferPointers( + private static func withUnsafeDecodingTablesAsBufferPointers( options: Base64.DecodingOptions, _ body: ( UnsafeBufferPointer, UnsafeBufferPointer, UnsafeBufferPointer, @@ -232,10 +266,64 @@ extension Base64 { } } - internal static let encodePaddingCharacter: UInt8 = 61 - static let badCharacter: UInt32 = 0x01FF_FFFF + private static let encodePaddingCharacter: UInt8 = 61 + + private static let encodeBase64: [UInt8] = [ + UInt8(ascii: "A"), UInt8(ascii: "B"), UInt8(ascii: "C"), UInt8(ascii: "D"), + UInt8(ascii: "E"), UInt8(ascii: "F"), UInt8(ascii: "G"), UInt8(ascii: "H"), + UInt8(ascii: "I"), UInt8(ascii: "J"), UInt8(ascii: "K"), UInt8(ascii: "L"), + UInt8(ascii: "M"), UInt8(ascii: "N"), UInt8(ascii: "O"), UInt8(ascii: "P"), + UInt8(ascii: "Q"), UInt8(ascii: "R"), UInt8(ascii: "S"), UInt8(ascii: "T"), + UInt8(ascii: "U"), UInt8(ascii: "V"), UInt8(ascii: "W"), UInt8(ascii: "X"), + UInt8(ascii: "Y"), UInt8(ascii: "Z"), UInt8(ascii: "a"), UInt8(ascii: "b"), + UInt8(ascii: "c"), UInt8(ascii: "d"), UInt8(ascii: "e"), UInt8(ascii: "f"), + UInt8(ascii: "g"), UInt8(ascii: "h"), UInt8(ascii: "i"), UInt8(ascii: "j"), + UInt8(ascii: "k"), UInt8(ascii: "l"), UInt8(ascii: "m"), UInt8(ascii: "n"), + UInt8(ascii: "o"), UInt8(ascii: "p"), UInt8(ascii: "q"), UInt8(ascii: "r"), + UInt8(ascii: "s"), UInt8(ascii: "t"), UInt8(ascii: "u"), UInt8(ascii: "v"), + UInt8(ascii: "w"), UInt8(ascii: "x"), UInt8(ascii: "y"), UInt8(ascii: "z"), + UInt8(ascii: "0"), UInt8(ascii: "1"), UInt8(ascii: "2"), UInt8(ascii: "3"), + UInt8(ascii: "4"), UInt8(ascii: "5"), UInt8(ascii: "6"), UInt8(ascii: "7"), + UInt8(ascii: "8"), UInt8(ascii: "9"), UInt8(ascii: "+"), UInt8(ascii: "/"), + ] + + private static func encode(alphabet: [UInt8], firstByte: UInt8) -> UInt8 { + let index = firstByte >> 2 + return alphabet[Int(index)] + } + + private static func encode(alphabet: [UInt8], firstByte: UInt8, secondByte: UInt8?) -> UInt8 { + var index = (firstByte & 0b00000011) << 4 + if let secondByte = secondByte { + index += (secondByte & 0b11110000) >> 4 + } + return alphabet[Int(index)] + } - static let decoding0: [UInt32] = [ + private static func encode(alphabet: [UInt8], secondByte: UInt8?, thirdByte: UInt8?) -> UInt8 { + guard let secondByte = secondByte else { + // No second byte means we are just emitting padding. + return Base64.encodePaddingCharacter + } + var index = (secondByte & 0b00001111) << 2 + if let thirdByte = thirdByte { + index += (thirdByte & 0b11000000) >> 6 + } + return alphabet[Int(index)] + } + + private static func encode(alphabet: [UInt8], thirdByte: UInt8?) -> UInt8 { + guard let thirdByte = thirdByte else { + // No third byte means just padding. + return Base64.encodePaddingCharacter + } + let index = thirdByte & 0b00111111 + return alphabet[Int(index)] + } + + private static let badCharacter: UInt32 = 0x01FF_FFFF + + private static let decoding0: [UInt32] = [ 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, @@ -281,7 +369,7 @@ extension Base64 { 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, ] - static let decoding1: [UInt32] = [ + private static let decoding1: [UInt32] = [ 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, @@ -327,7 +415,7 @@ extension Base64 { 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, ] - static let decoding2: [UInt32] = [ + private static let decoding2: [UInt32] = [ 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, @@ -373,7 +461,7 @@ extension Base64 { 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, ] - static let decoding3: [UInt32] = [ + private static let decoding3: [UInt32] = [ 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, @@ -419,7 +507,7 @@ extension Base64 { 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, ] - static let decoding0url: [UInt32] = [ + private static let decoding0url: [UInt32] = [ 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, // 0 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, // 6 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, // 12 @@ -465,7 +553,7 @@ extension Base64 { 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, ] - static let decoding1url: [UInt32] = [ + private static let decoding1url: [UInt32] = [ 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, // 0 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, // 6 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, // 12 @@ -511,7 +599,7 @@ extension Base64 { 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, ] - static let decoding2url: [UInt32] = [ + private static let decoding2url: [UInt32] = [ 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, // 0 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, // 6 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, // 12 @@ -557,7 +645,7 @@ extension Base64 { 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, ] - static let decoding3url: [UInt32] = [ + private static let decoding3url: [UInt32] = [ 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, // 0 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, // 6 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, // 12 @@ -603,3 +691,42 @@ extension Base64 { 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, 0x01FF_FFFF, ] } + +extension String { + /// This is a backport of a proposed String initializer that will allow writing directly into an uninitialized String's backing memory. + /// + /// As this API does not exist prior to 5.3 on Linux, or on older Apple platforms, we fake it out with a pointer and accept the extra copy. + init( + backportUnsafeUninitializedCapacity capacity: Int, + initializingUTF8With initializer: (_ buffer: UnsafeMutableBufferPointer) throws -> Int + ) rethrows { + + // The buffer will store zero terminated C string + let buffer = UnsafeMutableBufferPointer.allocate(capacity: capacity + 1) + defer { + buffer.deallocate() + } + + let initializedCount = try initializer(buffer) + precondition(initializedCount <= capacity, "Overran buffer in initializer!") + + // add zero termination + buffer[initializedCount] = 0 + + self = String(cString: buffer.baseAddress!) + } + + init( + customUnsafeUninitializedCapacity capacity: Int, + initializingUTF8With initializer: (_ buffer: UnsafeMutableBufferPointer) throws -> Int + ) rethrows { + if #available(macOS 11.0, iOS 14.0, tvOS 14.0, watchOS 7.0, *) { + try self.init(unsafeUninitializedCapacity: capacity, initializingUTF8With: initializer) + } else { + try self.init( + backportUnsafeUninitializedCapacity: capacity, + initializingUTF8With: initializer + ) + } + } +} diff --git a/Sources/GRPCCore/Metadata.swift b/Sources/GRPCCore/Metadata.swift index 0b2e560f8..217c8a784 100644 --- a/Sources/GRPCCore/Metadata.swift +++ b/Sources/GRPCCore/Metadata.swift @@ -85,6 +85,17 @@ public struct Metadata: Sendable, Hashable { public enum Value: Sendable, Hashable { case string(String) case binary([UInt8]) + + /// The value as a String. If it was originally stored as a binary, the base64-encoded String version + /// of the binary data will be returned instead. + public func encoded() -> String { + switch self { + case .string(let string): + return string + case .binary(let bytes): + return Base64.encode(bytes: bytes) + } + } } /// A metadata key-value pair. diff --git a/Sources/GRPCHTTP2Core/Compression/CompressionAlgorithm.swift b/Sources/GRPCHTTP2Core/Compression/CompressionAlgorithm.swift new file mode 100644 index 000000000..82de83eed --- /dev/null +++ b/Sources/GRPCHTTP2Core/Compression/CompressionAlgorithm.swift @@ -0,0 +1,52 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// Supported message compression algorithms. +/// +/// These algorithms are indicated in the "grpc-encoding" header. As such, a lack of "grpc-encoding" +/// header indicates that there is no message compression. +public struct CompressionAlgorithm: Hashable, Sendable { + /// Identity compression; "no" compression but indicated via the "grpc-encoding" header. + public static let identity = CompressionAlgorithm(.identity) + public static let deflate = CompressionAlgorithm(.deflate) + public static let gzip = CompressionAlgorithm(.gzip) + + // The order here is important: most compression to least. + public static let all: [CompressionAlgorithm] = [.gzip, .deflate, .identity] + + public var name: String { + return self.algorithm.rawValue + } + + internal enum Algorithm: String { + case identity + case deflate + case gzip + } + + internal let algorithm: Algorithm + + private init(_ algorithm: Algorithm) { + self.algorithm = algorithm + } + + internal init?(rawValue: String) { + guard let algorithm = Algorithm(rawValue: rawValue) else { + return nil + } + self.algorithm = algorithm + } +} diff --git a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift new file mode 100644 index 000000000..0cd0ca797 --- /dev/null +++ b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift @@ -0,0 +1,1404 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore +import NIOCore +import NIOHPACK +import NIOHTTP1 + +enum Scheme: String { + case http + case https +} + +enum GRPCStreamStateMachineConfiguration { + case client(ClientConfiguration) + case server(ServerConfiguration) + + struct ClientConfiguration { + var methodDescriptor: MethodDescriptor + var scheme: Scheme + var outboundEncoding: CompressionAlgorithm + var acceptedEncodings: [CompressionAlgorithm] + } + + struct ServerConfiguration { + var scheme: Scheme + var acceptedEncodings: [CompressionAlgorithm] + } +} + +private enum GRPCStreamStateMachineState { + case clientIdleServerIdle(ClientIdleServerIdleState) + case clientOpenServerIdle(ClientOpenServerIdleState) + case clientOpenServerOpen(ClientOpenServerOpenState) + case clientOpenServerClosed(ClientOpenServerClosedState) + case clientClosedServerIdle(ClientClosedServerIdleState) + case clientClosedServerOpen(ClientClosedServerOpenState) + case clientClosedServerClosed(ClientClosedServerClosedState) + + struct ClientIdleServerIdleState { + let maximumPayloadSize: Int + } + + struct ClientOpenServerIdleState { + let maximumPayloadSize: Int + var framer: GRPCMessageFramer + var compressor: Zlib.Compressor? + var outboundCompression: CompressionAlgorithm? + + // The deframer must be optional because the client will not have one configured + // until the server opens and sends a grpc-encoding header. + // It will be present for the server though, because even though it's idle, + // it can still receive compressed messages from the client. + let deframer: NIOSingleStepByteToMessageProcessor? + var decompressor: Zlib.Decompressor? + + var inboundMessageBuffer: OneOrManyQueue<[UInt8]> + + init( + previousState: ClientIdleServerIdleState, + compressor: Zlib.Compressor?, + framer: GRPCMessageFramer, + decompressor: Zlib.Decompressor?, + deframer: NIOSingleStepByteToMessageProcessor? + ) { + self.maximumPayloadSize = previousState.maximumPayloadSize + self.compressor = compressor + self.framer = framer + self.decompressor = decompressor + self.deframer = deframer + self.inboundMessageBuffer = .init() + } + } + + struct ClientOpenServerOpenState { + var framer: GRPCMessageFramer + var compressor: Zlib.Compressor? + + let deframer: NIOSingleStepByteToMessageProcessor + var decompressor: Zlib.Decompressor? + + var inboundMessageBuffer: OneOrManyQueue<[UInt8]> + + init( + previousState: ClientOpenServerIdleState, + deframer: NIOSingleStepByteToMessageProcessor, + decompressor: Zlib.Decompressor? + ) { + self.framer = previousState.framer + self.compressor = previousState.compressor + + self.deframer = deframer + self.decompressor = decompressor + + self.inboundMessageBuffer = previousState.inboundMessageBuffer + } + } + + struct ClientOpenServerClosedState { + var framer: GRPCMessageFramer + var compressor: Zlib.Compressor? + + let deframer: NIOSingleStepByteToMessageProcessor? + var decompressor: Zlib.Decompressor? + + var inboundMessageBuffer: OneOrManyQueue<[UInt8]> + + init(previousState: ClientOpenServerOpenState) { + self.framer = previousState.framer + self.compressor = previousState.compressor + self.deframer = previousState.deframer + self.decompressor = previousState.decompressor + self.inboundMessageBuffer = previousState.inboundMessageBuffer + } + + init(previousState: ClientOpenServerIdleState) { + self.framer = previousState.framer + self.compressor = previousState.compressor + self.inboundMessageBuffer = previousState.inboundMessageBuffer + // The server went directly from idle to closed - this means it sent a + // trailers-only response: + // - if we're the client, the previous state was a nil deframer, but that + // is okay because we don't need a deframer as the server won't be sending + // any messages; + // - if we're the server, we'll keep whatever deframer we had. + self.deframer = previousState.deframer + self.decompressor = previousState.decompressor + } + } + + struct ClientClosedServerIdleState { + let maximumPayloadSize: Int + var framer: GRPCMessageFramer + var compressor: Zlib.Compressor? + var outboundCompression: CompressionAlgorithm? + + let deframer: NIOSingleStepByteToMessageProcessor? + var decompressor: Zlib.Decompressor? + + var inboundMessageBuffer: OneOrManyQueue<[UInt8]> + + /// We are closing the client as soon as it opens (i.e., endStream was set when receiving the client's + /// initial metadata). We don't need to know a decompression algorithm, since we won't receive + /// any more messages from the client anyways, as it's closed. + init( + previousState: ClientIdleServerIdleState, + compressionAlgorithm: CompressionAlgorithm + ) { + self.maximumPayloadSize = previousState.maximumPayloadSize + + if let zlibMethod = Zlib.Method(encoding: compressionAlgorithm) { + self.compressor = Zlib.Compressor(method: zlibMethod) + } + self.framer = GRPCMessageFramer() + self.outboundCompression = compressionAlgorithm + // We don't need a deframer since we won't receive any messages from the + // client: it's closed. + self.deframer = nil + self.inboundMessageBuffer = .init() + } + + init(previousState: ClientOpenServerIdleState) { + self.maximumPayloadSize = previousState.maximumPayloadSize + self.framer = previousState.framer + self.compressor = previousState.compressor + self.outboundCompression = previousState.outboundCompression + self.deframer = previousState.deframer + self.decompressor = previousState.decompressor + self.inboundMessageBuffer = previousState.inboundMessageBuffer + } + } + + struct ClientClosedServerOpenState { + var framer: GRPCMessageFramer + var compressor: Zlib.Compressor? + + let deframer: NIOSingleStepByteToMessageProcessor? + var decompressor: Zlib.Decompressor? + + var inboundMessageBuffer: OneOrManyQueue<[UInt8]> + + init(previousState: ClientOpenServerOpenState) { + self.framer = previousState.framer + self.compressor = previousState.compressor + self.deframer = previousState.deframer + self.decompressor = previousState.decompressor + self.inboundMessageBuffer = previousState.inboundMessageBuffer + } + + /// This should be called from the server path, as the deframer will already be configured in this scenario. + init(previousState: ClientClosedServerIdleState) { + self.framer = previousState.framer + self.compressor = previousState.compressor + + // In the case of the server, we don't need to deframe/decompress any more + // messages, since the client's closed. + self.deframer = nil + self.decompressor = nil + + self.inboundMessageBuffer = previousState.inboundMessageBuffer + } + + /// This should only be called from the client path, as the deframer has not yet been set up. + init( + previousState: ClientClosedServerIdleState, + decompressionAlgorithm: CompressionAlgorithm + ) { + self.framer = previousState.framer + self.compressor = previousState.compressor + + // In the case of the client, it will only be able to set up the deframer + // after it receives the chosen encoding from the server. + if let zlibMethod = Zlib.Method(encoding: decompressionAlgorithm) { + self.decompressor = Zlib.Decompressor(method: zlibMethod) + } + let decoder = GRPCMessageDeframer( + maximumPayloadSize: previousState.maximumPayloadSize, + decompressor: self.decompressor + ) + self.deframer = NIOSingleStepByteToMessageProcessor(decoder) + + self.inboundMessageBuffer = previousState.inboundMessageBuffer + } + } + + struct ClientClosedServerClosedState { + // We still need the framer and compressor in case the server has closed + // but its buffer is not yet empty and still needs to send messages out to + // the client. + var framer: GRPCMessageFramer + var compressor: Zlib.Compressor? + + // These are already deframed, so we don't need the deframer anymore. + var inboundMessageBuffer: OneOrManyQueue<[UInt8]> + + init(previousState: ClientClosedServerOpenState) { + self.framer = previousState.framer + self.compressor = previousState.compressor + self.inboundMessageBuffer = previousState.inboundMessageBuffer + } + + init(previousState: ClientClosedServerIdleState) { + self.framer = previousState.framer + self.compressor = previousState.compressor + self.inboundMessageBuffer = previousState.inboundMessageBuffer + } + + init(previousState: ClientOpenServerIdleState) { + self.framer = previousState.framer + self.compressor = previousState.compressor + self.inboundMessageBuffer = previousState.inboundMessageBuffer + } + + init(previousState: ClientOpenServerClosedState) { + self.framer = previousState.framer + self.compressor = previousState.compressor + self.inboundMessageBuffer = previousState.inboundMessageBuffer + } + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +struct GRPCStreamStateMachine { + private var state: GRPCStreamStateMachineState + private var configuration: GRPCStreamStateMachineConfiguration + private var skipAssertions: Bool + + init( + configuration: GRPCStreamStateMachineConfiguration, + maximumPayloadSize: Int, + skipAssertions: Bool = false + ) { + self.state = .clientIdleServerIdle(.init(maximumPayloadSize: maximumPayloadSize)) + self.configuration = configuration + self.skipAssertions = skipAssertions + } + + mutating func send(metadata: Metadata) throws -> HPACKHeaders { + switch self.configuration { + case .client(let clientConfiguration): + return try self.clientSend(metadata: metadata, configuration: clientConfiguration) + case .server(let serverConfiguration): + return try self.serverSend(metadata: metadata, configuration: serverConfiguration) + } + } + + mutating func send(message: [UInt8], endStream: Bool) throws { + switch self.configuration { + case .client: + try self.clientSend(message: message, endStream: endStream) + case .server: + if endStream { + try self.invalidState( + "Can't end response stream by sending a message - send(status:metadata:trailersOnly:) must be called" + ) + } + try self.serverSend(message: message) + } + } + + mutating func send( + status: Status, + metadata: Metadata + ) throws -> HPACKHeaders { + switch self.configuration { + case .client: + try self.invalidState( + "Client cannot send status and trailer." + ) + case .server: + return try self.serverSend( + status: status, + customMetadata: metadata + ) + } + } + + enum OnMetadataReceived: Equatable { + case receivedMetadata(Metadata) + + // Client-specific actions + case receivedStatusAndMetadata(status: Status, metadata: Metadata) + case doNothing + + // Server-specific actions + case rejectRPC(trailers: HPACKHeaders) + } + + mutating func receive(metadata: HPACKHeaders, endStream: Bool) throws -> OnMetadataReceived { + switch self.configuration { + case .client: + return try self.clientReceive(metadata: metadata, endStream: endStream) + case .server(let serverConfiguration): + return try self.serverReceive( + metadata: metadata, + endStream: endStream, + configuration: serverConfiguration + ) + } + } + + mutating func receive(message: ByteBuffer, endStream: Bool) throws { + switch self.configuration { + case .client: + try self.clientReceive(bytes: message, endStream: endStream) + case .server: + try self.serverReceive(bytes: message, endStream: endStream) + } + } + + /// The result of requesting the next outbound message. + enum OnNextOutboundMessage: Equatable { + /// Either the receiving party is closed, so we shouldn't send any more messages; or the sender is done + /// writing messages (i.e. we are now closed). + case noMoreMessages + /// There isn't a message ready to be sent, but we could still receive more, so keep trying. + case awaitMoreMessages + /// A message is ready to be sent. + case sendMessage(ByteBuffer) + } + + mutating func nextOutboundMessage() throws -> OnNextOutboundMessage { + switch self.configuration { + case .client: + return try self.clientNextOutboundMessage() + case .server: + return try self.serverNextOutboundMessage() + } + } + + /// The result of requesting the next inbound message. + enum OnNextInboundMessage: Equatable { + /// The sender is done writing messages and there are no more messages to be received. + case noMoreMessages + /// There isn't a message ready to be sent, but we could still receive more, so keep trying. + case awaitMoreMessages + /// A message has been received. + case receiveMessage([UInt8]) + } + + mutating func nextInboundMessage() -> OnNextInboundMessage { + switch self.configuration { + case .client: + return self.clientNextInboundMessage() + case .server: + return self.serverNextInboundMessage() + } + } + + mutating func tearDown() { + switch self.state { + case .clientIdleServerIdle: + () + case .clientOpenServerIdle(let state): + state.compressor?.end() + state.decompressor?.end() + case .clientOpenServerOpen(let state): + state.compressor?.end() + state.decompressor?.end() + case .clientOpenServerClosed(let state): + state.compressor?.end() + state.decompressor?.end() + case .clientClosedServerIdle(let state): + state.compressor?.end() + state.decompressor?.end() + case .clientClosedServerOpen(let state): + state.compressor?.end() + state.decompressor?.end() + case .clientClosedServerClosed(let state): + state.compressor?.end() + } + } +} + +// - MARK: Client + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension GRPCStreamStateMachine { + private func makeClientHeaders( + methodDescriptor: MethodDescriptor, + scheme: Scheme, + outboundEncoding: CompressionAlgorithm?, + acceptedEncodings: [CompressionAlgorithm], + customMetadata: Metadata + ) -> HPACKHeaders { + var headers = HPACKHeaders() + headers.reserveCapacity(7 + customMetadata.count) + + // Add required headers. + // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests + + // The order is important here: reserved HTTP2 headers (those starting with `:`) + // must come before all other headers. + headers.add("POST", forKey: .method) + headers.add(scheme.rawValue, forKey: .scheme) + headers.add(methodDescriptor.fullyQualifiedMethod, forKey: .path) + + // Add required gRPC headers. + headers.add(ContentType.grpc.canonicalValue, forKey: .contentType) + headers.add("trailers", forKey: .te) // Used to detect incompatible proxies + + if let encoding = outboundEncoding, encoding != .identity { + headers.add(encoding.name, forKey: .encoding) + } + + for acceptedEncoding in acceptedEncodings { + headers.add(acceptedEncoding.name, forKey: .acceptEncoding) + } + + for metadataPair in customMetadata { + headers.add(name: metadataPair.key, value: metadataPair.value.encoded()) + } + + return headers + } + + private mutating func clientSend( + metadata: Metadata, + configuration: GRPCStreamStateMachineConfiguration.ClientConfiguration + ) throws -> HPACKHeaders { + // Client sends metadata only when opening the stream. + switch self.state { + case .clientIdleServerIdle(let state): + let outboundEncoding = configuration.outboundEncoding + let compressor = Zlib.Method(encoding: outboundEncoding) + .flatMap { Zlib.Compressor(method: $0) } + self.state = .clientOpenServerIdle( + .init( + previousState: state, + compressor: compressor, + framer: GRPCMessageFramer(), + decompressor: nil, + deframer: nil + ) + ) + return self.makeClientHeaders( + methodDescriptor: configuration.methodDescriptor, + scheme: configuration.scheme, + outboundEncoding: configuration.outboundEncoding, + acceptedEncodings: configuration.acceptedEncodings, + customMetadata: metadata + ) + case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed: + try self.invalidState( + "Client is already open: shouldn't be sending metadata." + ) + case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed: + try self.invalidState( + "Client is closed: can't send metadata." + ) + } + } + + private mutating func clientSend(message: [UInt8], endStream: Bool) throws { + // Client sends message. + switch self.state { + case .clientIdleServerIdle: + try self.invalidState("Client not yet open.") + case .clientOpenServerIdle(var state): + state.framer.append(message) + if endStream { + self.state = .clientClosedServerIdle(.init(previousState: state)) + } else { + self.state = .clientOpenServerIdle(state) + } + case .clientOpenServerOpen(var state): + state.framer.append(message) + if endStream { + self.state = .clientClosedServerOpen(.init(previousState: state)) + } else { + self.state = .clientOpenServerOpen(state) + } + case .clientOpenServerClosed(let state): + // The server has closed, so it makes no sense to send the rest of the request. + // However, do close if endStream is set. + if endStream { + self.state = .clientClosedServerClosed(.init(previousState: state)) + } + case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed: + try self.invalidState( + "Client is closed, cannot send a message." + ) + } + } + + /// Returns the client's next request to the server. + /// - Returns: The request to be made to the server. + private mutating func clientNextOutboundMessage() throws -> OnNextOutboundMessage { + switch self.state { + case .clientIdleServerIdle: + try self.invalidState("Client is not open yet.") + case .clientOpenServerIdle(var state): + let request = try state.framer.next(compressor: state.compressor) + self.state = .clientOpenServerIdle(state) + return request.map { .sendMessage($0) } ?? .awaitMoreMessages + case .clientOpenServerOpen(var state): + let request = try state.framer.next(compressor: state.compressor) + self.state = .clientOpenServerOpen(state) + return request.map { .sendMessage($0) } ?? .awaitMoreMessages + case .clientClosedServerIdle(var state): + let request = try state.framer.next(compressor: state.compressor) + self.state = .clientClosedServerIdle(state) + if let request { + return .sendMessage(request) + } else { + return .noMoreMessages + } + case .clientClosedServerOpen(var state): + let request = try state.framer.next(compressor: state.compressor) + self.state = .clientClosedServerOpen(state) + if let request { + return .sendMessage(request) + } else { + return .noMoreMessages + } + case .clientOpenServerClosed, .clientClosedServerClosed: + // No point in sending any more requests if the server is closed. + return .noMoreMessages + } + } + + private enum ServerHeadersValidationResult { + case valid + case invalid(OnMetadataReceived) + } + + private mutating func clientValidateHeadersReceivedFromServer( + _ metadata: HPACKHeaders + ) -> ServerHeadersValidationResult { + var httpStatus: String? { + metadata.firstString(forKey: .status) + } + var grpcStatus: Status.Code? { + metadata.firstString(forKey: .grpcStatus) + .flatMap { Int($0) } + .flatMap { Status.Code(rawValue: $0) } + } + guard httpStatus == "200" || grpcStatus != nil else { + let httpStatusCode = + httpStatus + .flatMap { Int($0) } + .map { HTTPResponseStatus(statusCode: $0) } + + guard let httpStatusCode else { + return .invalid( + .receivedStatusAndMetadata( + status: .init(code: .unknown, message: "HTTP Status Code is missing."), + metadata: Metadata(headers: metadata) + ) + ) + } + + if (100 ... 199).contains(httpStatusCode.code) { + // For 1xx status codes, the entire header should be skipped and a + // subsequent header should be read. + // See https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md + return .invalid(.doNothing) + } + + // Forward the mapped status code. + return .invalid( + .receivedStatusAndMetadata( + status: .init( + code: Status.Code(httpStatusCode: httpStatusCode), + message: "Unexpected non-200 HTTP Status Code." + ), + metadata: Metadata(headers: metadata) + ) + ) + } + + let contentTypeHeader = metadata.first(name: GRPCHTTP2Keys.contentType.rawValue) + guard contentTypeHeader.flatMap(ContentType.init) != nil else { + return .invalid( + .receivedStatusAndMetadata( + status: .init( + code: .internalError, + message: "Missing \(GRPCHTTP2Keys.contentType) header" + ), + metadata: Metadata(headers: metadata) + ) + ) + } + + return .valid + } + + private enum ProcessInboundEncodingResult { + case error(OnMetadataReceived) + case success(CompressionAlgorithm) + } + + private func processInboundEncoding(_ metadata: HPACKHeaders) -> ProcessInboundEncodingResult { + let inboundEncoding: CompressionAlgorithm + if let serverEncoding = metadata.first(name: GRPCHTTP2Keys.encoding.rawValue) { + guard let parsedEncoding = CompressionAlgorithm(rawValue: serverEncoding) else { + return .error( + .receivedStatusAndMetadata( + status: .init( + code: .internalError, + message: + "The server picked a compression algorithm ('\(serverEncoding)') the client does not know about." + ), + metadata: Metadata(headers: metadata) + ) + ) + } + inboundEncoding = parsedEncoding + } else { + inboundEncoding = .identity + } + return .success(inboundEncoding) + } + + private func validateAndReturnStatusAndMetadata( + _ metadata: HPACKHeaders + ) throws -> OnMetadataReceived { + let rawStatusCode = metadata.firstString(forKey: .grpcStatus) + guard let rawStatusCode, + let intStatusCode = Int(rawStatusCode), + let statusCode = Status.Code(rawValue: intStatusCode) + else { + let message = + "Non-initial metadata must be a trailer containing a valid grpc-status" + + (rawStatusCode.flatMap { "but was \($0)" } ?? "") + throw RPCError(code: .unknown, message: message) + } + + let statusMessage = + metadata.firstString(forKey: .grpcStatusMessage) + .map { GRPCStatusMessageMarshaller.unmarshall($0) } ?? "" + + var convertedMetadata = Metadata(headers: metadata) + convertedMetadata.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatus.rawValue) + convertedMetadata.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatusMessage.rawValue) + + return .receivedStatusAndMetadata( + status: Status(code: statusCode, message: statusMessage), + metadata: convertedMetadata + ) + } + + private mutating func clientReceive( + metadata: HPACKHeaders, + endStream: Bool + ) throws -> OnMetadataReceived { + switch self.state { + case .clientOpenServerIdle(let state): + switch (self.clientValidateHeadersReceivedFromServer(metadata), endStream) { + case (.invalid(let action), true): + // The headers are invalid, but the server signalled that it was + // closing the stream, so close both client and server. + self.state = .clientClosedServerClosed(.init(previousState: state)) + return action + case (.invalid(let action), false): + self.state = .clientClosedServerIdle(.init(previousState: state)) + return action + case (.valid, true): + // This is a trailers-only response: close server. + self.state = .clientOpenServerClosed(.init(previousState: state)) + return try self.validateAndReturnStatusAndMetadata(metadata) + case (.valid, false): + switch self.processInboundEncoding(metadata) { + case .error(let failure): + return failure + case .success(let inboundEncoding): + let decompressor = Zlib.Method(encoding: inboundEncoding) + .flatMap { Zlib.Decompressor(method: $0) } + let deframer = GRPCMessageDeframer( + maximumPayloadSize: state.maximumPayloadSize, + decompressor: decompressor + ) + + self.state = .clientOpenServerOpen( + .init( + previousState: state, + deframer: NIOSingleStepByteToMessageProcessor(deframer), + decompressor: decompressor + ) + ) + return .receivedMetadata(Metadata(headers: metadata)) + } + } + + case .clientOpenServerOpen(let state): + // This state is valid even if endStream is not set: server can send + // trailing metadata without END_STREAM set, and follow it with an + // empty message frame where it is set. + // However, we must make sure that grpc-status is set, otherwise this + // is an invalid state. + if endStream { + self.state = .clientOpenServerClosed(.init(previousState: state)) + } + return try self.validateAndReturnStatusAndMetadata(metadata) + + case .clientClosedServerIdle(let state): + switch (self.clientValidateHeadersReceivedFromServer(metadata), endStream) { + case (.invalid(let action), true): + // The headers are invalid, but the server signalled that it was + // closing the stream, so close the server side too. + self.state = .clientClosedServerClosed(.init(previousState: state)) + return action + case (.invalid(let action), false): + // Client is already closed, so we don't need to update our state. + return action + case (.valid, true): + // This is a trailers-only response: close server. + self.state = .clientClosedServerClosed(.init(previousState: state)) + return try self.validateAndReturnStatusAndMetadata(metadata) + case (.valid, false): + switch self.processInboundEncoding(metadata) { + case .error(let failure): + return failure + case .success(let inboundEncoding): + self.state = .clientClosedServerOpen( + .init( + previousState: state, + decompressionAlgorithm: inboundEncoding + ) + ) + return .receivedMetadata(Metadata(headers: metadata)) + } + } + + case .clientClosedServerOpen(let state): + // This state is valid even if endStream is not set: server can send + // trailing metadata without END_STREAM set, and follow it with an + // empty message frame where it is set. + // However, we must make sure that grpc-status is set, otherwise this + // is an invalid state. + if endStream { + self.state = .clientClosedServerClosed(.init(previousState: state)) + } + return try self.validateAndReturnStatusAndMetadata(metadata) + + case .clientClosedServerClosed: + // We could end up here if we received a grpc-status header in a previous + // frame (which would have already close the server) and then we receive + // an empty frame with EOS set. + // We wouldn't want to throw in that scenario, so we just ignore it. + // Note that we don't want to ignore it if EOS is not set here though, as + // then it would be an invalid payload. + if !endStream || metadata.count > 0 { + try self.invalidState( + "Server is closed, nothing could have been sent." + ) + } + return .doNothing + case .clientIdleServerIdle: + try self.invalidState( + "Server cannot have sent metadata if the client is idle." + ) + case .clientOpenServerClosed: + try self.invalidState( + "Server is closed, nothing could have been sent." + ) + } + } + + private mutating func clientReceive(bytes: ByteBuffer, endStream: Bool) throws { + // This is a message received by the client, from the server. + switch self.state { + case .clientIdleServerIdle: + try self.invalidState( + "Cannot have received anything from server if client is not yet open." + ) + case .clientOpenServerIdle, .clientClosedServerIdle: + try self.invalidState( + "Server cannot have sent a message before sending the initial metadata." + ) + case .clientOpenServerOpen(var state): + try state.deframer.process(buffer: bytes) { deframedMessage in + state.inboundMessageBuffer.append(deframedMessage) + } + if endStream { + self.state = .clientOpenServerClosed(.init(previousState: state)) + } else { + self.state = .clientOpenServerOpen(state) + } + case .clientClosedServerOpen(var state): + // The client may have sent the end stream and thus it's closed, + // but the server may still be responding. + // The client must have a deframer set up, so force-unwrap is okay. + try state.deframer!.process(buffer: bytes) { deframedMessage in + state.inboundMessageBuffer.append(deframedMessage) + } + if endStream { + self.state = .clientClosedServerClosed(.init(previousState: state)) + } else { + self.state = .clientClosedServerOpen(state) + } + case .clientOpenServerClosed, .clientClosedServerClosed: + try self.invalidState( + "Cannot have received anything from a closed server." + ) + } + } + + private mutating func clientNextInboundMessage() -> OnNextInboundMessage { + switch self.state { + case .clientOpenServerOpen(var state): + let message = state.inboundMessageBuffer.pop() + self.state = .clientOpenServerOpen(state) + return message.map { .receiveMessage($0) } ?? .awaitMoreMessages + case .clientOpenServerClosed(var state): + let message = state.inboundMessageBuffer.pop() + self.state = .clientOpenServerClosed(state) + return message.map { .receiveMessage($0) } ?? .noMoreMessages + case .clientClosedServerOpen(var state): + let message = state.inboundMessageBuffer.pop() + self.state = .clientClosedServerOpen(state) + return message.map { .receiveMessage($0) } ?? .awaitMoreMessages + case .clientClosedServerClosed(var state): + let message = state.inboundMessageBuffer.pop() + self.state = .clientClosedServerClosed(state) + return message.map { .receiveMessage($0) } ?? .noMoreMessages + case .clientIdleServerIdle, + .clientOpenServerIdle, + .clientClosedServerIdle: + return .awaitMoreMessages + } + } + + private func invalidState(_ message: String, line: UInt = #line) throws -> Never { + if !self.skipAssertions { + assertionFailure(message, line: line) + } + throw RPCError(code: .internalError, message: message) + } +} + +// - MARK: Server + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension GRPCStreamStateMachine { + private func makeResponseHeaders( + outboundEncoding: CompressionAlgorithm?, + configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration, + customMetadata: Metadata + ) -> HPACKHeaders { + // Response headers always contain :status (HTTP Status 200) and content-type. + // They may also contain grpc-encoding, grpc-accept-encoding, and custom metadata. + var headers = HPACKHeaders() + headers.reserveCapacity(4 + customMetadata.count) + + headers.add("200", forKey: .status) + headers.add(ContentType.grpc.canonicalValue, forKey: .contentType) + + if let outboundEncoding, outboundEncoding != .identity { + headers.add(outboundEncoding.name, forKey: .encoding) + } + + for acceptedEncoding in configuration.acceptedEncodings { + headers.add(acceptedEncoding.name, forKey: .acceptEncoding) + } + + for metadataPair in customMetadata { + headers.add(name: metadataPair.key, value: metadataPair.value.encoded()) + } + + return headers + } + + private mutating func serverSend( + metadata: Metadata, + configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration + ) throws -> HPACKHeaders { + // Server sends initial metadata + switch self.state { + case .clientOpenServerIdle(let state): + self.state = .clientOpenServerOpen( + .init( + previousState: state, + // In the case of the server, it will already have a deframer set up, + // because it already knows what encoding the client is using: + // it's okay to force-unwrap. + deframer: state.deframer!, + decompressor: state.decompressor + ) + ) + return self.makeResponseHeaders( + outboundEncoding: state.outboundCompression, + configuration: configuration, + customMetadata: metadata + ) + case .clientClosedServerIdle(let state): + self.state = .clientClosedServerOpen(.init(previousState: state)) + return self.makeResponseHeaders( + outboundEncoding: state.outboundCompression, + configuration: configuration, + customMetadata: metadata + ) + case .clientIdleServerIdle: + try self.invalidState( + "Client cannot be idle if server is sending initial metadata: it must have opened." + ) + case .clientOpenServerClosed, .clientClosedServerClosed: + try self.invalidState( + "Server cannot send metadata if closed." + ) + case .clientOpenServerOpen, .clientClosedServerOpen: + try self.invalidState( + "Server has already sent initial metadata." + ) + } + } + + private mutating func serverSend(message: [UInt8]) throws { + switch self.state { + case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle: + try self.invalidState( + "Server must have sent initial metadata before sending a message." + ) + case .clientOpenServerOpen(var state): + state.framer.append(message) + self.state = .clientOpenServerOpen(state) + case .clientClosedServerOpen(var state): + state.framer.append(message) + self.state = .clientClosedServerOpen(state) + case .clientOpenServerClosed, .clientClosedServerClosed: + try self.invalidState( + "Server can't send a message if it's closed." + ) + } + } + + private func makeTrailers( + status: Status, + customMetadata: Metadata?, + trailersOnly: Bool + ) -> HPACKHeaders { + // Trailers always contain the grpc-status header, and optionally, + // grpc-status-message, and custom metadata. + // If it's a trailers-only response, they will also contain :status and + // content-type. + var headers = HPACKHeaders() + let customMetadataCount = customMetadata?.count ?? 0 + if trailersOnly { + // Reserve 4 for capacity: 3 for the required headers, and 1 for the + // optional status message. + headers.reserveCapacity(4 + customMetadataCount) + headers.add("200", forKey: .status) + headers.add(ContentType.grpc.canonicalValue, forKey: .contentType) + } else { + // Reserve 2 for capacity: one for the required grpc-status, and + // one for the optional message. + headers.reserveCapacity(2 + customMetadataCount) + } + + headers.add(String(status.code.rawValue), forKey: .grpcStatus) + + if !status.message.isEmpty { + if let percentEncodedMessage = GRPCStatusMessageMarshaller.marshall(status.message) { + headers.add(percentEncodedMessage, forKey: .grpcStatusMessage) + } + } + + if let customMetadata { + for metadataPair in customMetadata { + headers.add(name: metadataPair.key, value: metadataPair.value.encoded()) + } + } + + return headers + } + + private mutating func serverSend( + status: Status, + customMetadata: Metadata + ) throws -> HPACKHeaders { + // Close the server. + switch self.state { + case .clientOpenServerOpen(let state): + self.state = .clientOpenServerClosed(.init(previousState: state)) + return self.makeTrailers( + status: status, + customMetadata: customMetadata, + trailersOnly: false + ) + case .clientClosedServerOpen(let state): + self.state = .clientClosedServerClosed(.init(previousState: state)) + return self.makeTrailers( + status: status, + customMetadata: customMetadata, + trailersOnly: false + ) + case .clientOpenServerIdle(let state): + self.state = .clientOpenServerClosed(.init(previousState: state)) + return self.makeTrailers( + status: status, + customMetadata: customMetadata, + trailersOnly: true + ) + case .clientClosedServerIdle(let state): + self.state = .clientClosedServerClosed(.init(previousState: state)) + return self.makeTrailers( + status: status, + customMetadata: customMetadata, + trailersOnly: true + ) + case .clientIdleServerIdle: + try self.invalidState( + "Server can't send status if client is idle." + ) + case .clientOpenServerClosed, .clientClosedServerClosed: + try self.invalidState( + "Server can't send anything if closed." + ) + } + } + + private mutating func serverReceive( + metadata: HPACKHeaders, + endStream: Bool, + configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration + ) throws -> OnMetadataReceived { + switch self.state { + case .clientIdleServerIdle(let state): + let contentType = metadata.firstString(forKey: .contentType) + .flatMap { ContentType(value: $0) } + guard contentType != nil else { + // Respond with HTTP-level Unsupported Media Type status code. + var trailers = HPACKHeaders() + trailers.add("415", forKey: .status) + return .rejectRPC(trailers: trailers) + } + + let path = metadata.firstString(forKey: .path) + .flatMap { MethodDescriptor(fullyQualifiedMethod: $0) } + guard path != nil else { + let status = Status( + code: .unimplemented, + message: "No \(GRPCHTTP2Keys.path.rawValue) header has been set." + ) + let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true) + return .rejectRPC(trailers: trailers) + } + + func isIdentityOrCompatibleEncoding(_ clientEncoding: CompressionAlgorithm) -> Bool { + clientEncoding == .identity || configuration.acceptedEncodings.contains(clientEncoding) + } + + // Firstly, find out if we support the client's chosen encoding, and reject + // the RPC if we don't. + let inboundEncoding: CompressionAlgorithm + let encodingValues = metadata.values( + forHeader: GRPCHTTP2Keys.encoding.rawValue, + canonicalForm: true + ) + var encodingValuesIterator = encodingValues.makeIterator() + if let rawEncoding = encodingValuesIterator.next() { + guard encodingValuesIterator.next() == nil else { + let status = Status( + code: .internalError, + message: "\(GRPCHTTP2Keys.encoding) must contain no more than one value." + ) + let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true) + return .rejectRPC(trailers: trailers) + } + + guard let clientEncoding = CompressionAlgorithm(rawValue: String(rawEncoding)), + isIdentityOrCompatibleEncoding(clientEncoding) + else { + let statusMessage: String + let customMetadata: Metadata? + if configuration.acceptedEncodings.isEmpty { + statusMessage = "Compression is not supported" + customMetadata = nil + } else { + statusMessage = """ + \(rawEncoding) compression is not supported; \ + supported algorithms are listed in grpc-accept-encoding + """ + customMetadata = { + var trailers = Metadata() + trailers.reserveCapacity(configuration.acceptedEncodings.count) + for acceptedEncoding in configuration.acceptedEncodings { + trailers.addString( + acceptedEncoding.name, + forKey: GRPCHTTP2Keys.acceptEncoding.rawValue + ) + } + return trailers + }() + } + + let trailers = self.makeTrailers( + status: Status(code: .unimplemented, message: statusMessage), + customMetadata: customMetadata, + trailersOnly: true + ) + return .rejectRPC(trailers: trailers) + } + + // Server supports client's encoding. + inboundEncoding = clientEncoding + } else { + inboundEncoding = .identity + } + + // Secondly, find a compatible encoding the server can use to compress outbound messages, + // based on the encodings the client has advertised. + var outboundEncoding: CompressionAlgorithm = .identity + let clientAdvertisedEncodings = metadata.values( + forHeader: GRPCHTTP2Keys.acceptEncoding.rawValue, + canonicalForm: true + ) + // Find the preferred encoding and use it to compress responses. + // If it's identity, just skip it altogether, since we won't be + // compressing. + for clientAdvertisedEncoding in clientAdvertisedEncodings { + if let algorithm = CompressionAlgorithm(rawValue: String(clientAdvertisedEncoding)), + isIdentityOrCompatibleEncoding(algorithm) + { + outboundEncoding = algorithm + break + } + } + + if endStream { + self.state = .clientClosedServerIdle( + .init( + previousState: state, + compressionAlgorithm: outboundEncoding + ) + ) + } else { + let compressor = Zlib.Method(encoding: outboundEncoding) + .flatMap { Zlib.Compressor(method: $0) } + let decompressor = Zlib.Method(encoding: inboundEncoding) + .flatMap { Zlib.Decompressor(method: $0) } + let deframer = GRPCMessageDeframer( + maximumPayloadSize: state.maximumPayloadSize, + decompressor: decompressor + ) + + self.state = .clientOpenServerIdle( + .init( + previousState: state, + compressor: compressor, + framer: GRPCMessageFramer(), + decompressor: decompressor, + deframer: NIOSingleStepByteToMessageProcessor(deframer) + ) + ) + } + + return .receivedMetadata(Metadata(headers: metadata)) + case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed: + try self.invalidState( + "Client shouldn't have sent metadata twice." + ) + case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed: + try self.invalidState( + "Client can't have sent metadata if closed." + ) + } + } + + private mutating func serverReceive(bytes: ByteBuffer, endStream: Bool) throws { + switch self.state { + case .clientIdleServerIdle: + try self.invalidState( + "Can't have received a message if client is idle." + ) + case .clientOpenServerIdle(var state): + // Deframer must be present on the server side, as we know the decompression + // algorithm from the moment the client opens. + try state.deframer!.process(buffer: bytes) { deframedMessage in + state.inboundMessageBuffer.append(deframedMessage) + } + + if endStream { + self.state = .clientClosedServerIdle(.init(previousState: state)) + } else { + self.state = .clientOpenServerIdle(state) + } + case .clientOpenServerOpen(var state): + try state.deframer.process(buffer: bytes) { deframedMessage in + state.inboundMessageBuffer.append(deframedMessage) + } + + if endStream { + self.state = .clientClosedServerOpen(.init(previousState: state)) + } else { + self.state = .clientOpenServerOpen(state) + } + case .clientOpenServerClosed(let state): + // Client is not done sending request, but server has already closed. + // Ignore the rest of the request: do nothing, unless endStream is set, + // in which case close the client. + if endStream { + self.state = .clientClosedServerClosed(.init(previousState: state)) + } + case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed: + try self.invalidState( + "Client can't send a message if closed." + ) + } + } + + private mutating func serverNextOutboundMessage() throws -> OnNextOutboundMessage { + switch self.state { + case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle: + try self.invalidState("Server is not open yet.") + case .clientOpenServerOpen(var state): + let response = try state.framer.next(compressor: state.compressor) + self.state = .clientOpenServerOpen(state) + return response.map { .sendMessage($0) } ?? .awaitMoreMessages + case .clientClosedServerOpen(var state): + let response = try state.framer.next(compressor: state.compressor) + self.state = .clientClosedServerOpen(state) + return response.map { .sendMessage($0) } ?? .awaitMoreMessages + case .clientOpenServerClosed(var state): + let response = try state.framer.next(compressor: state.compressor) + self.state = .clientOpenServerClosed(state) + if let response { + return .sendMessage(response) + } else { + return .noMoreMessages + } + case .clientClosedServerClosed(var state): + let response = try state.framer.next(compressor: state.compressor) + self.state = .clientClosedServerClosed(state) + if let response { + return .sendMessage(response) + } else { + return .noMoreMessages + } + } + } + + private mutating func serverNextInboundMessage() -> OnNextInboundMessage { + switch self.state { + case .clientOpenServerIdle(var state): + let request = state.inboundMessageBuffer.pop() + self.state = .clientOpenServerIdle(state) + return request.map { .receiveMessage($0) } ?? .awaitMoreMessages + case .clientOpenServerOpen(var state): + let request = state.inboundMessageBuffer.pop() + self.state = .clientOpenServerOpen(state) + return request.map { .receiveMessage($0) } ?? .awaitMoreMessages + case .clientOpenServerClosed(var state): + let request = state.inboundMessageBuffer.pop() + self.state = .clientOpenServerClosed(state) + return request.map { .receiveMessage($0) } ?? .awaitMoreMessages + case .clientClosedServerOpen(var state): + let request = state.inboundMessageBuffer.pop() + self.state = .clientClosedServerOpen(state) + return request.map { .receiveMessage($0) } ?? .noMoreMessages + case .clientClosedServerClosed(var state): + let request = state.inboundMessageBuffer.pop() + self.state = .clientClosedServerClosed(state) + return request.map { .receiveMessage($0) } ?? .noMoreMessages + case .clientClosedServerIdle: + return .noMoreMessages + case .clientIdleServerIdle: + return .awaitMoreMessages + } + } +} + +extension MethodDescriptor { + init?(fullyQualifiedMethod: String) { + let split = fullyQualifiedMethod.split(separator: "/") + guard split.count == 2 else { + return nil + } + self.init(service: String(split[0]), method: String(split[1])) + } +} + +internal enum GRPCHTTP2Keys: String { + case path = ":path" + case contentType = "content-type" + case encoding = "grpc-encoding" + case acceptEncoding = "grpc-accept-encoding" + case scheme = ":scheme" + case method = ":method" + case te = "te" + case status = ":status" + case grpcStatus = "grpc-status" + case grpcStatusMessage = "grpc-status-message" +} + +extension HPACKHeaders { + internal func firstString(forKey key: GRPCHTTP2Keys) -> String? { + self.values(forHeader: key.rawValue, canonicalForm: true).first(where: { _ in true }).map { + String($0) + } + } + + internal mutating func add(_ value: String, forKey key: GRPCHTTP2Keys) { + self.add(name: key.rawValue, value: value) + } +} + +extension Zlib.Method { + init?(encoding: CompressionAlgorithm) { + switch encoding { + case .identity: + return nil + case .deflate: + self = .deflate + case .gzip: + self = .gzip + default: + return nil + } + } +} + +extension Metadata { + init(headers: HPACKHeaders) { + var metadata = Metadata() + metadata.reserveCapacity(headers.count) + for header in headers { + if header.name.hasSuffix("-bin") { + do { + let decodedBinary = try header.value.base64Decoded() + metadata.addBinary(decodedBinary, forKey: header.name) + } catch { + metadata.addString(header.value, forKey: header.name) + } + } else { + metadata.addString(header.value, forKey: header.name) + } + } + self = metadata + } +} + +extension Status.Code { + // See https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md + init(httpStatusCode: HTTPResponseStatus) { + switch httpStatusCode { + case .badRequest: + self = .internalError + case .unauthorized: + self = .unauthenticated + case .forbidden: + self = .permissionDenied + case .notFound: + self = .unimplemented + case .tooManyRequests, .badGateway, .serviceUnavailable, .gatewayTimeout: + self = .unavailable + default: + self = .unknown + } + } +} diff --git a/Sources/GRPCHTTP2Core/Internal/ContentType.swift b/Sources/GRPCHTTP2Core/Internal/ContentType.swift new file mode 100644 index 000000000..2e098d39f --- /dev/null +++ b/Sources/GRPCHTTP2Core/Internal/ContentType.swift @@ -0,0 +1,40 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// See: +// - https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md +enum ContentType { + case grpc + + init?(value: String) { + switch value { + case "application/grpc", + "application/grpc+proto": + self = .grpc + + default: + return nil + } + } + + var canonicalValue: String { + switch self { + case .grpc: + // This is more widely supported than "application/grpc+proto" + return "application/grpc" + } + } +} diff --git a/Sources/GRPCHTTP2Core/Internal/GRPCStatusMessageMarshaller.swift b/Sources/GRPCHTTP2Core/Internal/GRPCStatusMessageMarshaller.swift new file mode 100644 index 000000000..4f8b1eb40 --- /dev/null +++ b/Sources/GRPCHTTP2Core/Internal/GRPCStatusMessageMarshaller.swift @@ -0,0 +1,205 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +enum GRPCStatusMessageMarshaller { + /// Adds percent encoding to the given message. + /// + /// - Parameter message: Message to percent encode. + /// - Returns: Percent encoded string, or `nil` if it could not be encoded. + static func marshall(_ message: String) -> String? { + return percentEncode(message) + } + + /// Removes percent encoding from the given message. + /// + /// - Parameter message: Message to remove encoding from. + /// - Returns: The string with percent encoding removed, or the input string if the encoding + /// could not be removed. + static func unmarshall(_ message: String) -> String { + return removePercentEncoding(message) + } +} + +extension GRPCStatusMessageMarshaller { + /// Adds percent encoding to the given message. + /// + /// gRPC uses percent encoding as defined in RFC 3986 § 2.1 but with a different set of restricted + /// characters. The allowed characters are all visible printing characters except for (`%`, + /// `0x25`). That is: `0x20`-`0x24`, `0x26`-`0x7E`. + /// + /// - Parameter message: The message to encode. + /// - Returns: Percent encoded string, or `nil` if it could not be encoded. + private static func percentEncode(_ message: String) -> String? { + let utf8 = message.utf8 + + let encodedLength = self.percentEncodedLength(for: utf8) + // Fast-path: all characters are valid, nothing to encode. + if encodedLength == utf8.count { + return message + } + + var bytes: [UInt8] = [] + bytes.reserveCapacity(encodedLength) + + for char in message.utf8 { + switch char { + // See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses + case 0x20 ... 0x24, + 0x26 ... 0x7E: + bytes.append(char) + + default: + bytes.append(UInt8(ascii: "%")) + bytes.append(self.toHex(char >> 4)) + bytes.append(self.toHex(char & 0xF)) + } + } + + return String(decoding: bytes, as: UTF8.self) + } + + /// Returns the percent encoded length of the given `UTF8View`. + private static func percentEncodedLength(for view: String.UTF8View) -> Int { + var count = view.count + for byte in view { + switch byte { + case 0x20 ... 0x24, + 0x26 ... 0x7E: + () + + default: + count += 2 + } + } + return count + } + + /// Encode the given byte as hexadecimal. + /// + /// - Precondition: Only the four least significant bits may be set. + /// - Parameter nibble: The nibble to convert to hexadecimal. + private static func toHex(_ nibble: UInt8) -> UInt8 { + assert(nibble & 0xF == nibble) + + switch nibble { + case 0 ... 9: + return nibble &+ UInt8(ascii: "0") + default: + return nibble &+ (UInt8(ascii: "A") &- 10) + } + } + + /// Remove gRPC percent encoding from `message`. If any portion of the string could not be decoded + /// then the encoded message will be returned. + /// + /// - Parameter message: The message to remove percent encoding from. + /// - Returns: The decoded message. + private static func removePercentEncoding(_ message: String) -> String { + let utf8 = message.utf8 + + let decodedLength = self.percentDecodedLength(for: utf8) + // Fast-path: no decoding to do! Note that we may also have detected that the encoding is + // invalid, in which case we will return the encoded message: this is fine. + if decodedLength == utf8.count { + return message + } + + var chars: [UInt8] = [] + // We can't decode more characters than are already encoded. + chars.reserveCapacity(decodedLength) + + var currentIndex = utf8.startIndex + let endIndex = utf8.endIndex + + while currentIndex < endIndex { + let byte = utf8[currentIndex] + + switch byte { + case UInt8(ascii: "%"): + guard let (nextIndex, nextNextIndex) = utf8.nextTwoIndices(after: currentIndex), + let nextHex = fromHex(utf8[nextIndex]), + let nextNextHex = fromHex(utf8[nextNextIndex]) + else { + // If we can't decode the message, aborting and returning the encoded message is fine + // according to the spec. + return message + } + chars.append((nextHex << 4) | nextNextHex) + currentIndex = nextNextIndex + + default: + chars.append(byte) + } + + currentIndex = utf8.index(after: currentIndex) + } + + return String(decoding: chars, as: Unicode.UTF8.self) + } + + /// Returns the expected length of the decoded `UTF8View`. + private static func percentDecodedLength(for view: String.UTF8View) -> Int { + var encoded = 0 + + for byte in view { + switch byte { + case UInt8(ascii: "%"): + // This can't overflow since it can't be larger than view.count. + encoded &+= 1 + + default: + () + } + } + + let notEncoded = view.count - (encoded * 3) + + guard notEncoded >= 0 else { + // We've received gibberish: more '%' than expected. gRPC allows for the status message to + // be left encoded should it be incorrectly encoded. We'll do exactly that by returning + // the number of bytes in the view which will causes us to take the fast-path exit. + return view.count + } + + return notEncoded + encoded + } + + private static func fromHex(_ byte: UInt8) -> UInt8? { + switch byte { + case UInt8(ascii: "0") ... UInt8(ascii: "9"): + return byte &- UInt8(ascii: "0") + case UInt8(ascii: "A") ... UInt8(ascii: "Z"): + return byte &- (UInt8(ascii: "A") &- 10) + case UInt8(ascii: "a") ... UInt8(ascii: "z"): + return byte &- (UInt8(ascii: "a") &- 10) + default: + return nil + } + } +} + +extension String.UTF8View { + /// Return the next two valid indices after the given index. The indices are considered valid if + /// they less than `endIndex`. + fileprivate func nextTwoIndices(after index: Index) -> (Index, Index)? { + let secondIndex = self.index(index, offsetBy: 2) + guard secondIndex < self.endIndex else { + return nil + } + + return (self.index(after: index), secondIndex) + } +} diff --git a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift new file mode 100644 index 000000000..34e615623 --- /dev/null +++ b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift @@ -0,0 +1,2194 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore +import NIOCore +import NIOHPACK +import XCTest + +@testable import GRPCHTTP2Core + +private enum TargetStateMachineState: CaseIterable { + case clientIdleServerIdle + case clientOpenServerIdle + case clientOpenServerOpen + case clientOpenServerClosed + case clientClosedServerIdle + case clientClosedServerOpen + case clientClosedServerClosed +} + +extension HPACKHeaders { + // Client + fileprivate static let clientInitialMetadata: Self = [ + GRPCHTTP2Keys.path.rawValue: "test/test", + GRPCHTTP2Keys.scheme.rawValue: "http", + GRPCHTTP2Keys.method.rawValue: "POST", + GRPCHTTP2Keys.contentType.rawValue: "application/grpc", + GRPCHTTP2Keys.te.rawValue: "trailers", + ] + fileprivate static let clientInitialMetadataWithDeflateCompression: Self = [ + GRPCHTTP2Keys.path.rawValue: "test/test", + GRPCHTTP2Keys.contentType.rawValue: "application/grpc", + GRPCHTTP2Keys.method.rawValue: "POST", + GRPCHTTP2Keys.scheme.rawValue: "https", + GRPCHTTP2Keys.te.rawValue: "te", + GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate", + GRPCHTTP2Keys.encoding.rawValue: "deflate", + ] + fileprivate static let clientInitialMetadataWithGzipCompression: Self = [ + GRPCHTTP2Keys.path.rawValue: "test/test", + GRPCHTTP2Keys.contentType.rawValue: "application/grpc", + GRPCHTTP2Keys.method.rawValue: "POST", + GRPCHTTP2Keys.scheme.rawValue: "https", + GRPCHTTP2Keys.te.rawValue: "te", + GRPCHTTP2Keys.acceptEncoding.rawValue: "gzip", + GRPCHTTP2Keys.encoding.rawValue: "gzip", + ] + fileprivate static let receivedWithoutContentType: Self = [ + GRPCHTTP2Keys.path.rawValue: "test/test" + ] + fileprivate static let receivedWithInvalidContentType: Self = [ + GRPCHTTP2Keys.path.rawValue: "test/test", + GRPCHTTP2Keys.contentType.rawValue: "invalid/invalid", + ] + fileprivate static let receivedWithoutEndpoint: Self = [ + GRPCHTTP2Keys.contentType.rawValue: "application/grpc" + ] + + // Server + fileprivate static let serverInitialMetadata: Self = [ + GRPCHTTP2Keys.status.rawValue: "200", + GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, + GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate", + ] + fileprivate static let serverInitialMetadataWithDeflateCompression: Self = [ + GRPCHTTP2Keys.status.rawValue: "200", + GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, + GRPCHTTP2Keys.encoding.rawValue: "deflate", + GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate", + ] + fileprivate static let serverTrailers: Self = [ + GRPCHTTP2Keys.status.rawValue: "200", + GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, + GRPCHTTP2Keys.grpcStatus.rawValue: "0", + ] +} + +final class GRPCStreamClientStateMachineTests: XCTestCase { + private func makeClientStateMachine( + targetState: TargetStateMachineState, + compressionEnabled: Bool = false + ) -> GRPCStreamStateMachine { + var stateMachine = GRPCStreamStateMachine( + configuration: .client( + .init( + methodDescriptor: .init(service: "test", method: "test"), + scheme: .http, + outboundEncoding: compressionEnabled ? .deflate : .identity, + acceptedEncodings: [.deflate] + ) + ), + maximumPayloadSize: 100, + skipAssertions: true + ) + + let serverMetadata: HPACKHeaders = + compressionEnabled ? .serverInitialMetadataWithDeflateCompression : .serverInitialMetadata + switch targetState { + case .clientIdleServerIdle: + break + case .clientOpenServerIdle: + // Open client + XCTAssertNoThrow(try stateMachine.send(metadata: [])) + case .clientOpenServerOpen: + // Open client + XCTAssertNoThrow(try stateMachine.send(metadata: [])) + // Open server + XCTAssertNoThrow(try stateMachine.receive(metadata: serverMetadata, endStream: false)) + case .clientOpenServerClosed: + // Open client + XCTAssertNoThrow(try stateMachine.send(metadata: [])) + // Open server + XCTAssertNoThrow(try stateMachine.receive(metadata: serverMetadata, endStream: false)) + // Close server + XCTAssertNoThrow(try stateMachine.receive(metadata: .serverTrailers, endStream: true)) + case .clientClosedServerIdle: + // Open client + XCTAssertNoThrow(try stateMachine.send(metadata: [])) + // Close client + XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) + case .clientClosedServerOpen: + // Open client + XCTAssertNoThrow(try stateMachine.send(metadata: [])) + // Open server + XCTAssertNoThrow(try stateMachine.receive(metadata: serverMetadata, endStream: false)) + // Close client + XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) + case .clientClosedServerClosed: + // Open client + XCTAssertNoThrow(try stateMachine.send(metadata: [])) + // Open server + XCTAssertNoThrow(try stateMachine.receive(metadata: serverMetadata, endStream: false)) + // Close client + XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) + // Close server + XCTAssertNoThrow(try stateMachine.receive(metadata: .serverTrailers, endStream: true)) + } + + return stateMachine + } + + // - MARK: Send Metadata + + func testSendMetadataWhenClientIdleAndServerIdle() throws { + var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle) + XCTAssertNoThrow(try stateMachine.send(metadata: [])) + } + + func testSendMetadataWhenClientAlreadyOpen() throws { + for targetState in [ + TargetStateMachineState.clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed, + ] { + var stateMachine = self.makeClientStateMachine(targetState: targetState) + + // Try sending metadata again: should throw + XCTAssertThrowsError(ofType: RPCError.self, try stateMachine.send(metadata: .init())) { + error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client is already open: shouldn't be sending metadata.") + } + } + } + + func testSendMetadataWhenClientAlreadyClosed() throws { + for targetState in [ + TargetStateMachineState.clientClosedServerIdle, .clientClosedServerOpen, + .clientClosedServerClosed, + ] { + var stateMachine = self.makeClientStateMachine(targetState: targetState) + + // Try sending metadata again: should throw + XCTAssertThrowsError(ofType: RPCError.self, try stateMachine.send(metadata: .init())) { + error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client is closed: can't send metadata.") + } + } + } + + // - MARK: Send Message + + func testSendMessageWhenClientIdleAndServerIdle() { + var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle) + + // Try to send a message without opening (i.e. without sending initial metadata) + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send(message: [], endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client not yet open.") + } + } + + func testSendMessageWhenClientOpen() { + for targetState in [ + TargetStateMachineState.clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed, + ] { + var stateMachine = self.makeClientStateMachine(targetState: targetState) + + // Now send a message + XCTAssertNoThrow(try stateMachine.send(message: [], endStream: false)) + } + } + + func testSendMessageWhenClientClosed() { + for targetState in [ + TargetStateMachineState.clientClosedServerIdle, .clientClosedServerOpen, + .clientClosedServerClosed, + ] { + var stateMachine = self.makeClientStateMachine(targetState: targetState) + + // Try sending another message: it should fail + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send(message: [], endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client is closed, cannot send a message.") + } + } + } + + // - MARK: Send Status and Trailers + + func testSendStatusAndTrailers() { + for targetState in TargetStateMachineState.allCases { + var stateMachine = self.makeClientStateMachine(targetState: targetState) + + // This operation is never allowed on the client. + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send( + status: Status(code: .ok, message: ""), + metadata: .init() + ) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client cannot send status and trailer.") + } + } + } + + // - MARK: Receive initial metadata + + func testReceiveInitialMetadataWhenClientIdleAndServerIdle() { + var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(metadata: .init(), endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server cannot have sent metadata if the client is idle.") + } + } + + func testReceiveInvalidInitialMetadataWhenServerIdle() throws { + for targetState in [ + TargetStateMachineState.clientOpenServerIdle, .clientClosedServerIdle, + ] { + var stateMachine = self.makeClientStateMachine(targetState: targetState) + + // Receive metadata with unexpected non-200 status code + let action = try stateMachine.receive( + metadata: [GRPCHTTP2Keys.status.rawValue: "300"], + endStream: false + ) + + XCTAssertEqual( + action, + .receivedStatusAndMetadata( + status: .init(code: .unknown, message: "Unexpected non-200 HTTP Status Code."), + metadata: [":status": "300"] + ) + ) + } + } + + func testReceiveInitialMetadataWhenServerIdle() throws { + for targetState in [ + TargetStateMachineState.clientOpenServerIdle, .clientClosedServerIdle, + ] { + var stateMachine = self.makeClientStateMachine(targetState: targetState) + + // Receive metadata = open server + let action = try stateMachine.receive( + metadata: [ + GRPCHTTP2Keys.status.rawValue: "200", + GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, + GRPCHTTP2Keys.encoding.rawValue: "deflate", + "custom": "123", + "custom-bin": String(base64Encoding: [42, 43, 44]), + ], + endStream: false + ) + + var expectedMetadata: Metadata = [ + ":status": "200", + "content-type": "application/grpc", + "grpc-encoding": "deflate", + "custom": "123", + ] + expectedMetadata.addBinary([42, 43, 44], forKey: "custom-bin") + XCTAssertEqual(action, .receivedMetadata(expectedMetadata)) + } + } + + func testReceiveInitialMetadataWhenServerOpen() throws { + for targetState in [ + TargetStateMachineState.clientOpenServerOpen, .clientClosedServerOpen, + ] { + var stateMachine = self.makeClientStateMachine(targetState: targetState) + + // Receiving initial metadata again should throw if grpc-status is not present. + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive( + metadata: [ + GRPCHTTP2Keys.status.rawValue: "200", + GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, + GRPCHTTP2Keys.encoding.rawValue: "deflate", + "custom": "123", + "custom-bin": String(base64Encoding: [42, 43, 44]), + ], + endStream: false + ) + ) { error in + XCTAssertEqual(error.code, .unknown) + XCTAssertEqual( + error.message, + "Non-initial metadata must be a trailer containing a valid grpc-status" + ) + } + + // Now make sure everything works well if we include grpc-status + let action = try stateMachine.receive( + metadata: [ + GRPCHTTP2Keys.status.rawValue: "200", + GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.ok.rawValue), + GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, + GRPCHTTP2Keys.encoding.rawValue: "deflate", + "custom": "123", + "custom-bin": String(base64Encoding: [42, 43, 44]), + ], + endStream: false + ) + + var expectedMetadata: Metadata = [ + ":status": "200", + "content-type": "application/grpc", + "grpc-encoding": "deflate", + "custom": "123", + ] + expectedMetadata.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatus.rawValue) + expectedMetadata.addBinary([42, 43, 44], forKey: "custom-bin") + XCTAssertEqual( + action, + .receivedStatusAndMetadata( + status: Status(code: .ok, message: ""), + metadata: expectedMetadata + ) + ) + } + } + + func testReceiveInitialMetadataWhenServerClosed() { + for targetState in [TargetStateMachineState.clientOpenServerClosed, .clientClosedServerClosed] { + var stateMachine = self.makeClientStateMachine(targetState: targetState) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(metadata: .init(), endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server is closed, nothing could have been sent.") + } + } + } + + // - MARK: Receive end trailers + + func testReceiveEndTrailerWhenClientIdleAndServerIdle() { + var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle) + + // Receive an end trailer + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(metadata: .init(), endStream: true) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server cannot have sent metadata if the client is idle.") + } + } + + func testReceiveEndTrailerWhenClientOpenAndServerIdle() throws { + var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerIdle) + + // Receive a trailers-only response + let trailersOnlyResponse: HPACKHeaders = [ + GRPCHTTP2Keys.status.rawValue: "200", + GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, + GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.internalError.rawValue), + GRPCHTTP2Keys.grpcStatusMessage.rawValue: GRPCStatusMessageMarshaller.marshall( + "Some status message" + )!, + "custom-key": "custom-value", + ] + let trailers = try stateMachine.receive(metadata: trailersOnlyResponse, endStream: true) + switch trailers { + case .receivedStatusAndMetadata(let status, let metadata): + XCTAssertEqual(status, Status(code: .internalError, message: "Some status message")) + XCTAssertEqual( + metadata, + [ + ":status": "200", + "content-type": "application/grpc", + "custom-key": "custom-value", + ] + ) + case .receivedMetadata, .doNothing, .rejectRPC: + XCTFail("Expected .receivedStatusAndMetadata") + } + } + + func testReceiveEndTrailerWhenServerOpen() throws { + for targetState in [TargetStateMachineState.clientOpenServerOpen, .clientClosedServerOpen] { + var stateMachine = self.makeClientStateMachine(targetState: targetState) + + // Receive an end trailer + let action = try stateMachine.receive( + metadata: [ + GRPCHTTP2Keys.status.rawValue: "200", + GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.ok.rawValue), + GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, + GRPCHTTP2Keys.encoding.rawValue: "deflate", + "custom": "123", + ], + endStream: true + ) + + let expectedMetadata: Metadata = [ + ":status": "200", + "content-type": "application/grpc", + "grpc-encoding": "deflate", + "custom": "123", + ] + XCTAssertEqual( + action, + .receivedStatusAndMetadata( + status: .init(code: .ok, message: ""), + metadata: expectedMetadata + ) + ) + } + } + + func testReceiveEndTrailerWhenClientOpenAndServerClosed() { + var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerClosed) + + // Receive another end trailer + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(metadata: .init(), endStream: true) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server is closed, nothing could have been sent.") + } + } + + func testReceiveEndTrailerWhenClientClosedAndServerIdle() throws { + var stateMachine = self.makeClientStateMachine(targetState: .clientClosedServerIdle) + + // Server sends a trailers-only response + let trailersOnlyResponse: HPACKHeaders = [ + GRPCHTTP2Keys.status.rawValue: "200", + GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, + GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.internalError.rawValue), + GRPCHTTP2Keys.grpcStatusMessage.rawValue: GRPCStatusMessageMarshaller.marshall( + "Some status message" + )!, + "custom-key": "custom-value", + ] + let trailers = try stateMachine.receive(metadata: trailersOnlyResponse, endStream: true) + switch trailers { + case .receivedStatusAndMetadata(let status, let metadata): + XCTAssertEqual(status, Status(code: .internalError, message: "Some status message")) + XCTAssertEqual( + metadata, + [ + ":status": "200", + "content-type": "application/grpc", + "custom-key": "custom-value", + ] + ) + case .receivedMetadata, .doNothing, .rejectRPC: + XCTFail("Expected .receivedStatusAndMetadata") + } + } + + func testReceiveEndTrailerWhenClientClosedAndServerClosed() { + var stateMachine = self.makeClientStateMachine(targetState: .clientClosedServerClosed) + + // Close server again (endStream = true) and assert we don't throw. + // This can happen if the previous close was caused by a grpc-status header + // and then the server sends an empty frame with EOS set. + XCTAssertEqual(try stateMachine.receive(metadata: .init(), endStream: true), .doNothing) + } + + // - MARK: Receive message + + func testReceiveMessageWhenClientIdleAndServerIdle() { + var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(message: .init(), endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual( + error.message, + "Cannot have received anything from server if client is not yet open." + ) + } + } + + func testReceiveMessageWhenServerIdle() { + for targetState in [TargetStateMachineState.clientOpenServerIdle, .clientClosedServerIdle] { + var stateMachine = self.makeClientStateMachine(targetState: targetState) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(message: .init(), endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual( + error.message, + "Server cannot have sent a message before sending the initial metadata." + ) + } + } + } + + func testReceiveMessageWhenServerOpen() throws { + for targetState in [TargetStateMachineState.clientOpenServerOpen, .clientClosedServerOpen] { + var stateMachine = self.makeClientStateMachine(targetState: targetState) + + XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + } + } + + func testReceiveMessageWhenServerClosed() { + for targetState in [TargetStateMachineState.clientOpenServerClosed, .clientClosedServerClosed] { + var stateMachine = self.makeClientStateMachine(targetState: targetState) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(message: .init(), endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Cannot have received anything from a closed server.") + } + } + } + + // - MARK: Next outbound message + + func testNextOutboundMessageWhenClientIdleAndServerIdle() { + var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.nextOutboundMessage() + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client is not open yet.") + } + } + + func testNextOutboundMessageWhenClientOpenAndServerOpenOrIdle() throws { + for targetState in [TargetStateMachineState.clientOpenServerIdle, .clientOpenServerOpen] { + var stateMachine = self.makeClientStateMachine(targetState: targetState) + + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) + + let expectedBytes: [UInt8] = [ + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 42, 42, // original message + ] + XCTAssertEqual( + try stateMachine.nextOutboundMessage(), + .sendMessage(ByteBuffer(bytes: expectedBytes)) + ) + + // And then make sure that nothing else is returned anymore + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + } + } + + func testNextOutboundMessageWhenClientOpenAndServerIdle_WithCompression() throws { + var stateMachine = self.makeClientStateMachine( + targetState: .clientOpenServerIdle, + compressionEnabled: true + ) + + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + + let originalMessage = [UInt8]([42, 42, 43, 43]) + XCTAssertNoThrow(try stateMachine.send(message: originalMessage, endStream: false)) + + let request = try stateMachine.nextOutboundMessage() + let framedMessage = try self.frameMessage(originalMessage, compress: true) + XCTAssertEqual(request, .sendMessage(framedMessage)) + } + + func testNextOutboundMessageWhenClientOpenAndServerOpen_WithCompression() throws { + var stateMachine = self.makeClientStateMachine( + targetState: .clientOpenServerOpen, + compressionEnabled: true + ) + + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + + let originalMessage = [UInt8]([42, 42, 43, 43]) + XCTAssertNoThrow(try stateMachine.send(message: originalMessage, endStream: false)) + + let request = try stateMachine.nextOutboundMessage() + let framedMessage = try self.frameMessage(originalMessage, compress: true) + XCTAssertEqual(request, .sendMessage(framedMessage)) + } + + func testNextOutboundMessageWhenClientOpenAndServerClosed() throws { + var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerClosed) + + // No more messages to send + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + + // Queue a message, but assert the action is .noMoreMessages nevertheless, + // because the server is closed. + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + } + + func testNextOutboundMessageWhenClientClosedAndServerIdle() throws { + var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerIdle) + + // Send a message and close client + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: true)) + + // Make sure that getting the next outbound message _does_ return the message + // we have enqueued. + let request = try stateMachine.nextOutboundMessage() + let expectedBytes: [UInt8] = [ + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 42, 42, // original message + ] + XCTAssertEqual(request, .sendMessage(ByteBuffer(bytes: expectedBytes))) + + // And then make sure that nothing else is returned anymore + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + } + + func testNextOutboundMessageWhenClientClosedAndServerOpen() throws { + var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen) + + // Send a message and close client + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: true)) + + // Make sure that getting the next outbound message _does_ return the message + // we have enqueued. + let request = try stateMachine.nextOutboundMessage() + let expectedBytes: [UInt8] = [ + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 42, 42, // original message + ] + XCTAssertEqual(request, .sendMessage(ByteBuffer(bytes: expectedBytes))) + + // And then make sure that nothing else is returned anymore + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + } + + func testNextOutboundMessageWhenClientClosedAndServerClosed() throws { + var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen) + // Send a message + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) + + // Close server + XCTAssertNoThrow(try stateMachine.receive(metadata: .serverTrailers, endStream: true)) + + // Close client + XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) + + // Even though we have enqueued a message, don't send it, because the server + // is closed. + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + } + + // - MARK: Next inbound message + + func testNextInboundMessageWhenServerIdle() { + for targetState in [ + TargetStateMachineState.clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle, + ] { + var stateMachine = self.makeClientStateMachine(targetState: targetState) + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + } + } + + func testNextInboundMessageWhenClientOpenAndServerOpen() throws { + var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen) + + let receivedBytes = ByteBuffer(bytes: [ + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 42, 42, // original message + ]) + try stateMachine.receive(message: receivedBytes, endStream: false) + + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + } + + func testNextInboundMessageWhenClientOpenAndServerOpen_WithCompression() throws { + var stateMachine = self.makeClientStateMachine( + targetState: .clientOpenServerOpen, + compressionEnabled: true + ) + + let originalMessage = [UInt8]([42, 42, 43, 43]) + let receivedBytes = try self.frameMessage(originalMessage, compress: true) + try stateMachine.receive(message: receivedBytes, endStream: false) + + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(originalMessage)) + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + } + + func testNextInboundMessageWhenClientOpenAndServerClosed() throws { + var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen) + + let receivedBytes = ByteBuffer(bytes: [ + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 42, 42, // original message + ]) + try stateMachine.receive(message: receivedBytes, endStream: false) + + // Close server + XCTAssertNoThrow(try stateMachine.receive(metadata: .serverTrailers, endStream: true)) + + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) + XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) + } + + func testNextInboundMessageWhenClientClosedAndServerOpen() throws { + var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen) + + let receivedBytes = ByteBuffer(bytes: [ + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 42, 42, // original message + ]) + try stateMachine.receive(message: receivedBytes, endStream: false) + + // Close client + XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) + + // Even though the client is closed, because it received a message while open, + // we must get the message now. + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + } + + func testNextInboundMessageWhenClientClosedAndServerClosed() throws { + var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen) + + let receivedBytes = ByteBuffer(bytes: [ + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 42, 42, // original message + ]) + try stateMachine.receive(message: receivedBytes, endStream: false) + + // Close server + XCTAssertNoThrow(try stateMachine.receive(metadata: .serverTrailers, endStream: true)) + + // Close client + XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) + + // Even though the client is closed, because it received a message while open, + // we must get the message now. + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) + XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) + } + + // - MARK: Common paths + + func testNormalFlow() throws { + var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle) + + // Client sends metadata + let clientInitialMetadata = try stateMachine.send(metadata: .init()) + XCTAssertEqual( + clientInitialMetadata, + [ + GRPCHTTP2Keys.path.rawValue: "test/test", + GRPCHTTP2Keys.scheme.rawValue: "http", + GRPCHTTP2Keys.method.rawValue: "POST", + GRPCHTTP2Keys.contentType.rawValue: "application/grpc", + GRPCHTTP2Keys.te.rawValue: "trailers", + GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate", + ] + ) + + // Server sends initial metadata + let serverInitialHeadersAction = try stateMachine.receive( + metadata: .serverInitialMetadata, + endStream: false + ) + XCTAssertEqual( + serverInitialHeadersAction, + .receivedMetadata([ + ":status": "200", + "content-type": "application/grpc", + "grpc-accept-encoding": "deflate", + ]) + ) + + // Client sends messages + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + + let message = [UInt8]([1, 2, 3, 4]) + let framedMessage = try self.frameMessage(message, compress: false) + try stateMachine.send(message: message, endStream: false) + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage)) + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + + // Server sends response + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + + let firstResponseBytes = [UInt8]([5, 6, 7]) + let firstResponse = try self.frameMessage(firstResponseBytes, compress: false) + let secondResponseBytes = [UInt8]([8, 9, 10]) + let secondResponse = try self.frameMessage(secondResponseBytes, compress: false) + try stateMachine.receive(message: firstResponse, endStream: false) + try stateMachine.receive(message: secondResponse, endStream: false) + + // Make sure messages have arrived + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes)) + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(secondResponseBytes)) + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + + // Client sends end + try stateMachine.send(message: [], endStream: true) + + // Server ends + let metadataReceivedAction = try stateMachine.receive( + metadata: .serverTrailers, + endStream: true + ) + let receivedMetadata = { + var m = Metadata(headers: .serverTrailers) + m.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatus.rawValue) + m.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatusMessage.rawValue) + return m + }() + XCTAssertEqual( + metadataReceivedAction, + .receivedStatusAndMetadata(status: .init(code: .ok, message: ""), metadata: receivedMetadata) + ) + + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) + } + + func testClientClosesBeforeServerOpens() throws { + var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle) + + // Client sends metadata + let clientInitialMetadata = try stateMachine.send(metadata: .init()) + XCTAssertEqual( + clientInitialMetadata, + [ + GRPCHTTP2Keys.path.rawValue: "test/test", + GRPCHTTP2Keys.scheme.rawValue: "http", + GRPCHTTP2Keys.method.rawValue: "POST", + GRPCHTTP2Keys.contentType.rawValue: "application/grpc", + GRPCHTTP2Keys.te.rawValue: "trailers", + GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate", + ] + ) + + // Client sends messages and ends + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + + let message = [UInt8]([1, 2, 3, 4]) + let framedMessage = try self.frameMessage(message, compress: false) + try stateMachine.send(message: message, endStream: true) + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage)) + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + + // Server sends initial metadata + let serverInitialHeadersAction = try stateMachine.receive( + metadata: .serverInitialMetadata, + endStream: false + ) + XCTAssertEqual( + serverInitialHeadersAction, + .receivedMetadata([ + ":status": "200", + "content-type": "application/grpc", + "grpc-accept-encoding": "deflate", + ]) + ) + + // Server sends response + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + + let firstResponseBytes = [UInt8]([5, 6, 7]) + let firstResponse = try self.frameMessage(firstResponseBytes, compress: false) + let secondResponseBytes = [UInt8]([8, 9, 10]) + let secondResponse = try self.frameMessage(secondResponseBytes, compress: false) + try stateMachine.receive(message: firstResponse, endStream: false) + try stateMachine.receive(message: secondResponse, endStream: false) + + // Make sure messages have arrived + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes)) + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(secondResponseBytes)) + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + + // Server ends + let metadataReceivedAction = try stateMachine.receive( + metadata: .serverTrailers, + endStream: true + ) + let receivedMetadata = { + var m = Metadata(headers: .serverTrailers) + m.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatus.rawValue) + m.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatusMessage.rawValue) + return m + }() + XCTAssertEqual( + metadataReceivedAction, + .receivedStatusAndMetadata(status: .init(code: .ok, message: ""), metadata: receivedMetadata) + ) + + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) + } + + func testClientClosesBeforeServerResponds() throws { + var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle) + + // Client sends metadata + let clientInitialMetadata = try stateMachine.send(metadata: .init()) + XCTAssertEqual( + clientInitialMetadata, + [ + GRPCHTTP2Keys.path.rawValue: "test/test", + GRPCHTTP2Keys.scheme.rawValue: "http", + GRPCHTTP2Keys.method.rawValue: "POST", + GRPCHTTP2Keys.contentType.rawValue: "application/grpc", + GRPCHTTP2Keys.te.rawValue: "trailers", + GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate", + ] + ) + + // Client sends messages + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + + let message = [UInt8]([1, 2, 3, 4]) + let framedMessage = try self.frameMessage(message, compress: false) + try stateMachine.send(message: message, endStream: false) + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage)) + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + + // Server sends initial metadata + let serverInitialHeadersAction = try stateMachine.receive( + metadata: .serverInitialMetadata, + endStream: false + ) + XCTAssertEqual( + serverInitialHeadersAction, + .receivedMetadata([ + ":status": "200", + "content-type": "application/grpc", + "grpc-accept-encoding": "deflate", + ]) + ) + + // Client sends end + try stateMachine.send(message: [], endStream: true) + + // Server sends response + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + + let firstResponseBytes = [UInt8]([5, 6, 7]) + let firstResponse = try self.frameMessage(firstResponseBytes, compress: false) + let secondResponseBytes = [UInt8]([8, 9, 10]) + let secondResponse = try self.frameMessage(secondResponseBytes, compress: false) + try stateMachine.receive(message: firstResponse, endStream: false) + try stateMachine.receive(message: secondResponse, endStream: false) + + // Make sure messages have arrived + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes)) + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(secondResponseBytes)) + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + + // Server ends + let metadataReceivedAction = try stateMachine.receive( + metadata: .serverTrailers, + endStream: true + ) + let receivedMetadata = { + var m = Metadata(headers: .serverTrailers) + m.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatus.rawValue) + m.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatusMessage.rawValue) + return m + }() + XCTAssertEqual( + metadataReceivedAction, + .receivedStatusAndMetadata(status: .init(code: .ok, message: ""), metadata: receivedMetadata) + ) + + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) + } +} + +final class GRPCStreamServerStateMachineTests: XCTestCase { + private func makeServerStateMachine( + targetState: TargetStateMachineState, + compressionEnabled: Bool = false + ) -> GRPCStreamStateMachine { + + var stateMachine = GRPCStreamStateMachine( + configuration: .server( + .init( + scheme: .http, + acceptedEncodings: [.deflate] + ) + ), + maximumPayloadSize: 100, + skipAssertions: true + ) + + let clientMetadata: HPACKHeaders = + compressionEnabled ? .clientInitialMetadataWithDeflateCompression : .clientInitialMetadata + switch targetState { + case .clientIdleServerIdle: + break + case .clientOpenServerIdle: + // Open client + XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false)) + case .clientOpenServerOpen: + // Open client + XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false)) + // Open server + XCTAssertNoThrow(try stateMachine.send(metadata: Metadata(headers: .serverInitialMetadata))) + case .clientOpenServerClosed: + // Open client + XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false)) + // Open server + XCTAssertNoThrow(try stateMachine.send(metadata: Metadata(headers: .serverInitialMetadata))) + // Close server + XCTAssertNoThrow( + try stateMachine.send( + status: .init(code: .ok, message: ""), + metadata: [] + ) + ) + case .clientClosedServerIdle: + // Open client + XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false)) + // Close client + XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + case .clientClosedServerOpen: + // Open client + XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false)) + // Open server + XCTAssertNoThrow(try stateMachine.send(metadata: Metadata(headers: .serverInitialMetadata))) + // Close client + XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + case .clientClosedServerClosed: + // Open client + XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false)) + // Open server + XCTAssertNoThrow(try stateMachine.send(metadata: Metadata(headers: .serverInitialMetadata))) + // Close client + XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + // Close server + XCTAssertNoThrow( + try stateMachine.send( + status: .init(code: .ok, message: ""), + metadata: [] + ) + ) + } + + return stateMachine + } + + // - MARK: Send Metadata + + func testSendMetadataWhenClientIdleAndServerIdle() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send(metadata: .init()) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual( + error.message, + "Client cannot be idle if server is sending initial metadata: it must have opened." + ) + } + } + + func testSendMetadataWhenClientOpenAndServerIdle() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle) + XCTAssertNoThrow(try stateMachine.send(metadata: .init())) + } + + func testSendMetadataWhenClientOpenAndServerOpen() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) + + // Try sending metadata again: should throw + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send(metadata: .init()) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server has already sent initial metadata.") + } + } + + func testSendMetadataWhenClientOpenAndServerClosed() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerClosed) + + // Try sending metadata again: should throw + XCTAssertThrowsError(ofType: RPCError.self, try stateMachine.send(metadata: .init())) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server cannot send metadata if closed.") + } + } + + func testSendMetadataWhenClientClosedAndServerIdle() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerIdle) + + // We should be allowed to send initial metadata if client is closed: + // client may be finished sending request but may still be awaiting response. + XCTAssertNoThrow(try stateMachine.send(metadata: .init())) + } + + func testSendMetadataWhenClientClosedAndServerOpen() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen) + + // Try sending metadata again: should throw + XCTAssertThrowsError(ofType: RPCError.self, try stateMachine.send(metadata: .init())) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server has already sent initial metadata.") + } + } + + func testSendMetadataWhenClientClosedAndServerClosed() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerClosed) + + // Try sending metadata again: should throw + XCTAssertThrowsError(ofType: RPCError.self, try stateMachine.send(metadata: .init())) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server cannot send metadata if closed.") + } + } + + // - MARK: Send Message + + func testSendMessageWhenClientIdleAndServerIdle() { + var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send(message: [], endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual( + error.message, + "Server must have sent initial metadata before sending a message." + ) + } + } + + func testSendMessageWhenClientOpenAndServerIdle() { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle) + + // Now send a message + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send(message: [], endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual( + error.message, + "Server must have sent initial metadata before sending a message." + ) + } + } + + func testSendMessageWhenClientOpenAndServerOpen() { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) + + // Now send a message + XCTAssertNoThrow(try stateMachine.send(message: [], endStream: false)) + } + + func testSendMessageWhenClientOpenAndServerClosed() { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerClosed) + + // Try sending another message: it should fail + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send(message: [], endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server can't send a message if it's closed.") + } + } + + func testSendMessageWhenClientClosedAndServerIdle() { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerIdle) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send(message: [], endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual( + error.message, + "Server must have sent initial metadata before sending a message." + ) + } + } + + func testSendMessageWhenClientClosedAndServerOpen() { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen) + + // Try sending a message: even though client is closed, we should send it + // because it may be expecting a response. + XCTAssertNoThrow(try stateMachine.send(message: [], endStream: false)) + } + + func testSendMessageWhenClientClosedAndServerClosed() { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerClosed) + + // Try sending another message: it should fail + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send(message: [], endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server can't send a message if it's closed.") + } + } + + // - MARK: Send Status and Trailers + + func testSendStatusAndTrailersWhenClientIdle() { + var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send( + status: .init(code: .ok, message: ""), + metadata: .init() + ) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server can't send status if client is idle.") + } + } + + func testSendStatusAndTrailersWhenClientOpenAndServerIdle() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle) + + let trailers = try stateMachine.send( + status: .init(code: .unknown, message: "RPC unknown"), + metadata: .init() + ) + + // Make sure it's a trailers-only response: it must have :status header and content-type + XCTAssertEqual( + trailers, + [ + ":status": "200", + "content-type": "application/grpc", + "grpc-status": "2", + "grpc-status-message": "RPC unknown", + ] + ) + + // Try sending another message: it should fail because server is now closed. + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send(message: [], endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server can't send a message if it's closed.") + } + } + + func testSendStatusAndTrailersWhenClientOpenAndServerOpen() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) + + let trailers = try stateMachine.send( + status: .init(code: .ok, message: ""), + metadata: .init() + ) + + // Make sure it's NOT a trailers-only response, because the server was + // already open (so it sent initial metadata): it shouldn't have :status or content-type headers + XCTAssertEqual(trailers, ["grpc-status": "0"]) + + // Try sending another message: it should fail because server is now closed. + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send(message: [], endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server can't send a message if it's closed.") + } + } + + func testSendStatusAndTrailersWhenClientOpenAndServerClosed() { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerClosed) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send( + status: .init(code: .ok, message: ""), + metadata: .init() + ) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server can't send anything if closed.") + } + } + + func testSendStatusAndTrailersWhenClientClosedAndServerIdle() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerIdle) + + let trailers = try stateMachine.send( + status: .init(code: .unknown, message: "RPC unknown"), + metadata: .init() + ) + + // Make sure it's a trailers-only response: it must have :status header and content-type + XCTAssertEqual( + trailers, + [ + ":status": "200", + "content-type": "application/grpc", + "grpc-status": "2", + "grpc-status-message": "RPC unknown", + ] + ) + + // Try sending another message: it should fail because server is now closed. + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send(message: [], endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server can't send a message if it's closed.") + } + } + + func testSendStatusAndTrailersWhenClientClosedAndServerOpen() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen) + + let trailers = try stateMachine.send( + status: .init(code: .ok, message: ""), + metadata: .init() + ) + + // Make sure it's NOT a trailers-only response, because the server was + // already open (so it sent initial metadata): it shouldn't have :status or content-type headers + XCTAssertEqual(trailers, ["grpc-status": "0"]) + + // Try sending another message: it should fail because server is now closed. + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send(message: [], endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server can't send a message if it's closed.") + } + } + + func testSendStatusAndTrailersWhenClientClosedAndServerClosed() { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerClosed) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.send( + status: .init(code: .ok, message: ""), + metadata: .init() + ) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server can't send anything if closed.") + } + } + + // - MARK: Receive metadata + + func testReceiveMetadataWhenClientIdleAndServerIdle() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) + + let action = try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false) + XCTAssertEqual( + action, + .receivedMetadata(Metadata(headers: .clientInitialMetadata)) + ) + } + + func testReceiveMetadataWhenClientIdleAndServerIdle_WithEndStream() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) + + let action = try stateMachine.receive(metadata: .clientInitialMetadata, endStream: true) + XCTAssertEqual( + action, + .receivedMetadata(Metadata(headers: .clientInitialMetadata)) + ) + } + + func testReceiveMetadataWhenClientIdleAndServerIdle_MissingContentType() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) + + let action = try stateMachine.receive( + metadata: .receivedWithoutContentType, + endStream: false + ) + + self.assertRejectedRPC(action) { trailers in + XCTAssertEqual(trailers.count, 1) + XCTAssertEqual(trailers.firstString(forKey: .status), "415") + } + } + + func testReceiveMetadataWhenClientIdleAndServerIdle_InvalidContentType() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) + + let action = try stateMachine.receive( + metadata: .receivedWithInvalidContentType, + endStream: false + ) + + self.assertRejectedRPC(action) { trailers in + XCTAssertEqual(trailers.count, 1) + XCTAssertEqual(trailers.firstString(forKey: .status), "415") + } + } + + func testReceiveMetadataWhenClientIdleAndServerIdle_MissingPath() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) + + let action = try stateMachine.receive( + metadata: .receivedWithoutEndpoint, + endStream: false + ) + + self.assertRejectedRPC(action) { trailers in + XCTAssertEqual( + trailers, + [ + ":status": "200", + "content-type": "application/grpc", + "grpc-status": "12", + "grpc-status-message": "No :path header has been set.", + ] + ) + } + } + + func testReceiveMetadataWhenClientIdleAndServerIdle_ServerUnsupportedEncoding() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) + + // Try opening client with a compression algorithm that is not accepted + // by the server. + let action = try stateMachine.receive( + metadata: .clientInitialMetadataWithGzipCompression, + endStream: false + ) + + self.assertRejectedRPC(action) { trailers in + XCTAssertEqual( + trailers, + [ + ":status": "200", + "content-type": "application/grpc", + "grpc-accept-encoding": "deflate", + "grpc-status": "12", + "grpc-status-message": + "gzip compression is not supported; supported algorithms are listed in grpc-accept-encoding", + ] + ) + } + } + + //TODO: add more encoding-related validation tests (for both client and server) + // and message encoding tests + + func testReceiveMetadataWhenClientOpenAndServerIdle() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle) + + // Try receiving initial metadata again - should fail + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client shouldn't have sent metadata twice.") + } + } + + func testReceiveMetadataWhenClientOpenAndServerOpen() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client shouldn't have sent metadata twice.") + } + } + + func testReceiveMetadataWhenClientOpenAndServerClosed() { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerClosed) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client shouldn't have sent metadata twice.") + } + } + + func testReceiveMetadataWhenClientClosedAndServerIdle() { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerIdle) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client can't have sent metadata if closed.") + } + } + + func testReceiveMetadataWhenClientClosedAndServerOpen() { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client can't have sent metadata if closed.") + } + } + + func testReceiveMetadataWhenClientClosedAndServerClosed() { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerClosed) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client can't have sent metadata if closed.") + } + } + + // - MARK: Receive message + + func testReceiveMessageWhenClientIdleAndServerIdle() { + var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(message: .init(), endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Can't have received a message if client is idle.") + } + } + + func testReceiveMessageWhenClientOpenAndServerIdle() { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle) + + // Receive messages successfully: the second one should close client. + XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + + // Verify client is now closed + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(message: .init(), endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client can't send a message if closed.") + } + } + + func testReceiveMessageWhenClientOpenAndServerOpen() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) + + // Receive messages successfully: the second one should close client. + XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + + // Verify client is now closed + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(message: .init(), endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client can't send a message if closed.") + } + } + + func testReceiveMessageWhenClientOpenAndServerClosed() { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerClosed) + + // Client is not done sending request, don't fail. + XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: false)) + } + + func testReceiveMessageWhenClientClosedAndServerIdle() { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerIdle) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(message: .init(), endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client can't send a message if closed.") + } + } + + func testReceiveMessageWhenClientClosedAndServerOpen() { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(message: .init(), endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client can't send a message if closed.") + } + } + + func testReceiveMessageWhenClientClosedAndServerClosed() { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerClosed) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.receive(message: .init(), endStream: false) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Client can't send a message if closed.") + } + } + + // - MARK: Next outbound message + + func testNextOutboundMessageWhenClientIdleAndServerIdle() { + var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.nextOutboundMessage() + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server is not open yet.") + } + } + + func testNextOutboundMessageWhenClientOpenAndServerIdle() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.nextOutboundMessage() + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server is not open yet.") + } + } + + func testNextOutboundMessageWhenClientOpenAndServerIdle_WithCompression() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.nextOutboundMessage() + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server is not open yet.") + } + } + + func testNextOutboundMessageWhenClientOpenAndServerOpen() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) + + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) + + let response = try stateMachine.nextOutboundMessage() + let expectedBytes: [UInt8] = [ + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 42, 42, // original message + ] + XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes))) + + // And then make sure that nothing else is returned + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + } + + func testNextOutboundMessageWhenClientOpenAndServerOpen_WithCompression() throws { + var stateMachine = self.makeServerStateMachine( + targetState: .clientOpenServerOpen, + compressionEnabled: true + ) + + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + + let originalMessage = [UInt8]([42, 42, 43, 43]) + XCTAssertNoThrow(try stateMachine.send(message: originalMessage, endStream: false)) + + let response = try stateMachine.nextOutboundMessage() + let framedMessage = try self.frameMessage(originalMessage, compress: true) + XCTAssertEqual(response, .sendMessage(framedMessage)) + } + + func testNextOutboundMessageWhenClientOpenAndServerClosed() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) + + // Send message and close server + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) + XCTAssertNoThrow( + try stateMachine.send( + status: .init(code: .ok, message: ""), + metadata: [] + ) + ) + + let response = try stateMachine.nextOutboundMessage() + let expectedBytes: [UInt8] = [ + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 42, 42, // original message + ] + XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes))) + + // And then make sure that nothing else is returned anymore + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + } + + func testNextOutboundMessageWhenClientClosedAndServerIdle() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerIdle) + + XCTAssertThrowsError( + ofType: RPCError.self, + try stateMachine.nextOutboundMessage() + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual(error.message, "Server is not open yet.") + } + } + + func testNextOutboundMessageWhenClientClosedAndServerOpen() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) + + // Send a message + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) + + // Close client + XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + + // Send another message + XCTAssertNoThrow(try stateMachine.send(message: [43, 43], endStream: false)) + + // Make sure that getting the next outbound message _does_ return the message + // we have enqueued. + let response = try stateMachine.nextOutboundMessage() + let expectedBytes: [UInt8] = [ + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 42, 42, // original message + // End of first message - beginning of second + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 43, 43, // original message + ] + XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes))) + + // And then make sure that nothing else is returned anymore + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + } + + func testNextOutboundMessageWhenClientClosedAndServerClosed() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen) + + // Send a message and close server + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) + XCTAssertNoThrow( + try stateMachine.send( + status: .init(code: .ok, message: ""), + metadata: [] + ) + ) + + // We have enqueued a message, make sure we return it even though server is closed, + // because we haven't yet drained all of the pending messages. + let response = try stateMachine.nextOutboundMessage() + let expectedBytes: [UInt8] = [ + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 42, 42, // original message + ] + XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes))) + + // And then make sure that nothing else is returned anymore + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + } + + // - MARK: Next inbound message + + func testNextInboundMessageWhenClientIdleAndServerIdle() { + var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + } + + func testNextInboundMessageWhenClientOpenAndServerIdle() { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle) + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + } + + func testNextInboundMessageWhenClientOpenAndServerOpen() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) + + let receivedBytes = ByteBuffer(bytes: [ + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 42, 42, // original message + ]) + try stateMachine.receive(message: receivedBytes, endStream: false) + + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + } + + func testNextInboundMessageWhenClientOpenAndServerOpen_WithCompression() throws { + var stateMachine = self.makeServerStateMachine( + targetState: .clientOpenServerOpen, + compressionEnabled: true + ) + + let originalMessage = [UInt8]([42, 42, 43, 43]) + let receivedBytes = try self.frameMessage(originalMessage, compress: true) + + try stateMachine.receive(message: receivedBytes, endStream: false) + + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(originalMessage)) + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + } + + func testNextInboundMessageWhenClientOpenAndServerClosed() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) + + let receivedBytes = ByteBuffer(bytes: [ + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 42, 42, // original message + ]) + try stateMachine.receive(message: receivedBytes, endStream: false) + + // Close server + XCTAssertNoThrow( + try stateMachine.send( + status: .init(code: .ok, message: ""), + metadata: [] + ) + ) + + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + } + + func testNextInboundMessageWhenClientClosedAndServerIdle() { + var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerIdle) + XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) + } + + func testNextInboundMessageWhenClientClosedAndServerOpen() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) + + let receivedBytes = ByteBuffer(bytes: [ + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 42, 42, // original message + ]) + try stateMachine.receive(message: receivedBytes, endStream: false) + + // Close client + XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + + // Even though the client is closed, because the server received a message + // while it was still open, we must get the message now. + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) + XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) + } + + func testNextInboundMessageWhenClientClosedAndServerClosed() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) + + let receivedBytes = ByteBuffer(bytes: [ + 0, // compression flag: unset + 0, 0, 0, 2, // message length: 2 bytes + 42, 42, // original message + ]) + try stateMachine.receive(message: receivedBytes, endStream: false) + + // Close server + XCTAssertNoThrow( + try stateMachine.send( + status: .init(code: .ok, message: ""), + metadata: [] + ) + ) + + // Close client + XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + + // Even though the client and server are closed, because the server received + // a message while the client was still open, we must get the message now. + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) + XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) + } + + // - MARK: Common paths + + func testNormalFlow() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) + + // Client sends metadata + let receiveMetadataAction = try stateMachine.receive( + metadata: .clientInitialMetadata, + endStream: false + ) + XCTAssertEqual( + receiveMetadataAction, + .receivedMetadata(Metadata(headers: .clientInitialMetadata)) + ) + + // Server sends initial metadata + let sentInitialHeaders = try stateMachine.send(metadata: Metadata(headers: ["custom": "value"])) + XCTAssertEqual( + sentInitialHeaders, + [ + ":status": "200", + "content-type": "application/grpc", + "grpc-accept-encoding": "deflate", + "custom": "value", + ] + ) + + // Client sends messages + let deframedMessage = [UInt8]([1, 2, 3, 4]) + let completeMessage = try self.frameMessage(deframedMessage, compress: false) + // Split message into two parts to make sure the stitching together of the frames works well + let firstMessage = completeMessage.getSlice(at: 0, length: 4)! + let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)! + + try stateMachine.receive(message: firstMessage, endStream: false) + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + try stateMachine.receive(message: secondMessage, endStream: false) + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage)) + + // Server sends response + let firstResponse = [UInt8]([5, 6, 7]) + let secondResponse = [UInt8]([8, 9, 10]) + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + try stateMachine.send(message: firstResponse, endStream: false) + try stateMachine.send(message: secondResponse, endStream: false) + + // Make sure messages are outbound + let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false) + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages)) + + // Client sends end + try stateMachine.receive(message: ByteBuffer(), endStream: true) + + // Server ends + let response = try stateMachine.send( + status: .init(code: .ok, message: ""), + metadata: [] + ) + XCTAssertEqual(response, ["grpc-status": "0"]) + + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) + } + + func testClientClosesBeforeServerOpens() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) + + // Client sends metadata + let receiveMetadataAction = try stateMachine.receive( + metadata: .clientInitialMetadata, + endStream: false + ) + XCTAssertEqual( + receiveMetadataAction, + .receivedMetadata(Metadata(headers: .clientInitialMetadata)) + ) + + // Client sends messages + let deframedMessage = [UInt8]([1, 2, 3, 4]) + let completeMessage = try self.frameMessage(deframedMessage, compress: false) + // Split message into two parts to make sure the stitching together of the frames works well + let firstMessage = completeMessage.getSlice(at: 0, length: 4)! + let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)! + + try stateMachine.receive(message: firstMessage, endStream: false) + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + try stateMachine.receive(message: secondMessage, endStream: false) + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage)) + + // Client sends end + try stateMachine.receive(message: ByteBuffer(), endStream: true) + + // Server sends initial metadata + let sentInitialHeaders = try stateMachine.send(metadata: Metadata(headers: ["custom": "value"])) + XCTAssertEqual( + sentInitialHeaders, + [ + "custom": "value", + ":status": "200", + "content-type": "application/grpc", + "grpc-accept-encoding": "deflate", + ] + ) + + // Server sends response + let firstResponse = [UInt8]([5, 6, 7]) + let secondResponse = [UInt8]([8, 9, 10]) + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + try stateMachine.send(message: firstResponse, endStream: false) + try stateMachine.send(message: secondResponse, endStream: false) + + // Make sure messages are outbound + let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false) + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages)) + + // Server ends + let response = try stateMachine.send( + status: .init(code: .ok, message: ""), + metadata: [] + ) + XCTAssertEqual(response, ["grpc-status": "0"]) + + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) + } + + func testClientClosesBeforeServerResponds() throws { + var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) + + // Client sends metadata + let receiveMetadataAction = try stateMachine.receive( + metadata: .clientInitialMetadata, + endStream: false + ) + XCTAssertEqual( + receiveMetadataAction, + .receivedMetadata(Metadata(headers: .clientInitialMetadata)) + ) + + // Client sends messages + let deframedMessage = [UInt8]([1, 2, 3, 4]) + let completeMessage = try self.frameMessage(deframedMessage, compress: false) + // Split message into two parts to make sure the stitching together of the frames works well + let firstMessage = completeMessage.getSlice(at: 0, length: 4)! + let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)! + + try stateMachine.receive(message: firstMessage, endStream: false) + XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) + try stateMachine.receive(message: secondMessage, endStream: false) + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage)) + + // Server sends initial metadata + let sentInitialHeaders = try stateMachine.send(metadata: Metadata(headers: ["custom": "value"])) + XCTAssertEqual( + sentInitialHeaders, + [ + "custom": "value", + ":status": "200", + "content-type": "application/grpc", + "grpc-accept-encoding": "deflate", + ] + ) + + // Client sends end + try stateMachine.receive(message: ByteBuffer(), endStream: true) + + // Server sends response + let firstResponse = [UInt8]([5, 6, 7]) + let secondResponse = [UInt8]([8, 9, 10]) + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + try stateMachine.send(message: firstResponse, endStream: false) + try stateMachine.send(message: secondResponse, endStream: false) + + // Make sure messages are outbound + let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false) + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages)) + + // Server ends + let response = try stateMachine.send( + status: .init(code: .ok, message: ""), + metadata: [] + ) + XCTAssertEqual(response, ["grpc-status": "0"]) + + XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) + } +} + +extension XCTestCase { + func assertRejectedRPC( + _ action: GRPCStreamStateMachine.OnMetadataReceived, + expression: (HPACKHeaders) throws -> Void + ) rethrows { + guard case .rejectRPC(let trailers) = action else { + XCTFail("RPC should have been rejected.") + return + } + try expression(trailers) + } + + func frameMessage(_ message: [UInt8], compress: Bool) throws -> ByteBuffer { + try frameMessages([message], compress: compress) + } + + func frameMessages(_ messages: [[UInt8]], compress: Bool) throws -> ByteBuffer { + var framer = GRPCMessageFramer() + let compressor: Zlib.Compressor? = { + if compress { + return Zlib.Compressor(method: .deflate) + } else { + return nil + } + }() + defer { compressor?.end() } + for message in messages { + framer.append(message) + } + return try XCTUnwrap(framer.next(compressor: compressor)) + } +} diff --git a/Tests/GRPCHTTP2CoreTests/Internal/GRPCStatusMessageMarshallerTests.swift b/Tests/GRPCHTTP2CoreTests/Internal/GRPCStatusMessageMarshallerTests.swift new file mode 100644 index 000000000..ac659fad9 --- /dev/null +++ b/Tests/GRPCHTTP2CoreTests/Internal/GRPCStatusMessageMarshallerTests.swift @@ -0,0 +1,44 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import XCTest + +@testable import GRPCHTTP2Core + +class GRPCStatusMessageMarshallerTests: XCTestCase { + func testASCIIMarshallingAndUnmarshalling() { + XCTAssertEqual(GRPCStatusMessageMarshaller.marshall("Hello, World!"), "Hello, World!") + XCTAssertEqual(GRPCStatusMessageMarshaller.unmarshall("Hello, World!"), "Hello, World!") + } + + func testPercentMarshallingAndUnmarshalling() { + XCTAssertEqual(GRPCStatusMessageMarshaller.marshall("%"), "%25") + XCTAssertEqual(GRPCStatusMessageMarshaller.unmarshall("%25"), "%") + + XCTAssertEqual(GRPCStatusMessageMarshaller.marshall("25%"), "25%25") + XCTAssertEqual(GRPCStatusMessageMarshaller.unmarshall("25%25"), "25%") + } + + func testUnicodeMarshalling() { + XCTAssertEqual(GRPCStatusMessageMarshaller.marshall("🚀"), "%F0%9F%9A%80") + XCTAssertEqual(GRPCStatusMessageMarshaller.unmarshall("%F0%9F%9A%80"), "🚀") + + let message = "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP 😈\t\n" + let marshalled = + "%09%0Atest with whitespace%0D%0Aand Unicode BMP %E2%98%BA and non-BMP %F0%9F%98%88%09%0A" + XCTAssertEqual(GRPCStatusMessageMarshaller.marshall(message), marshalled) + XCTAssertEqual(GRPCStatusMessageMarshaller.unmarshall(marshalled), message) + } +}