diff --git a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift index 6db75f843..5be8f1025 100644 --- a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift @@ -143,24 +143,18 @@ extension GRPCClientStreamHandler { do { self.flushPending = true let headers = try self.stateMachine.send(metadata: metadata) - context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: nil) - // TODO: move the promise handling into the state machine - promise?.succeed() + context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: promise) } catch { - context.fireErrorCaught(error) - // TODO: move the promise handling into the state machine promise?.fail(error) + context.fireErrorCaught(error) } case .message(let message): do { - try self.stateMachine.send(message: message) - // TODO: move the promise handling into the state machine - promise?.succeed() + try self.stateMachine.send(message: message, promise: promise) } catch { - context.fireErrorCaught(error) - // TODO: move the promise handling into the state machine promise?.fail(error) + context.fireErrorCaught(error) } } } @@ -197,12 +191,12 @@ extension GRPCClientStreamHandler { private func _flush(context: ChannelHandlerContext) { do { loop: while true { - switch try self.stateMachine.nextOutboundMessage() { - case .sendMessage(let byteBuffer): + switch try self.stateMachine.nextOutboundFrame() { + case .sendFrame(let byteBuffer, let promise): self.flushPending = true context.write( self.wrapOutboundOut(.data(.init(data: .byteBuffer(byteBuffer)))), - promise: nil + promise: promise ) case .noMoreMessages: diff --git a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift index 590b5efeb..29da4b7dc 100644 --- a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift +++ b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift @@ -32,7 +32,7 @@ struct GRPCMessageFramer { /// reserves capacity in powers of 2. This way, we can take advantage of the whole buffer. static let maximumWriteBufferLength = 65_536 - private var pendingMessages: OneOrManyQueue<[UInt8]> + private var pendingMessages: OneOrManyQueue<(bytes: [UInt8], promise: EventLoopPromise?)> private var writeBuffer: ByteBuffer @@ -44,8 +44,8 @@ struct GRPCMessageFramer { /// Queue the given bytes to be framed and potentially coalesced alongside other messages in a `ByteBuffer`. /// The resulting data will be returned when calling ``GRPCMessageFramer/next()``. - mutating func append(_ bytes: [UInt8]) { - self.pendingMessages.append(bytes) + mutating func append(_ bytes: [UInt8], promise: EventLoopPromise?) { + self.pendingMessages.append((bytes, promise)) } /// If there are pending messages to be framed, a `ByteBuffer` will be returned with the framed data. @@ -53,7 +53,9 @@ struct GRPCMessageFramer { /// - Parameter compressor: An optional compressor: if present, payloads will be compressed; otherwise /// they'll be framed as-is. /// - Throws: If an error is encountered, such as a compression failure, an error will be thrown. - mutating func next(compressor: Zlib.Compressor? = nil) throws -> ByteBuffer? { + mutating func next( + compressor: Zlib.Compressor? = nil + ) throws -> (bytes: ByteBuffer, promise: EventLoopPromise?)? { if self.pendingMessages.isEmpty { // Nothing pending: exit early. return nil @@ -69,15 +71,21 @@ struct GRPCMessageFramer { var requiredCapacity = 0 for message in self.pendingMessages { - requiredCapacity += message.count + Self.metadataLength + requiredCapacity += message.bytes.count + Self.metadataLength } self.writeBuffer.clear(minimumCapacity: requiredCapacity) + var pendingWritePromise: EventLoopPromise? while let message = self.pendingMessages.pop() { - try self.encode(message, compressor: compressor) + try self.encode(message.bytes, compressor: compressor) + if let existingPendingWritePromise = pendingWritePromise { + existingPendingWritePromise.futureResult.cascade(to: message.promise) + } else { + pendingWritePromise = message.promise + } } - return self.writeBuffer + return (bytes: self.writeBuffer, promise: pendingWritePromise) } private mutating func encode(_ message: [UInt8], compressor: Zlib.Compressor?) throws { diff --git a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift index 41961b568..1c8eca27f 100644 --- a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift +++ b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift @@ -325,12 +325,12 @@ struct GRPCStreamStateMachine { } } - mutating func send(message: [UInt8]) throws { + mutating func send(message: [UInt8], promise: EventLoopPromise?) throws { switch self.configuration { case .client: - try self.clientSend(message: message) + try self.clientSend(message: message, promise: promise) case .server: - try self.serverSend(message: message) + try self.serverSend(message: message, promise: promise) } } @@ -397,23 +397,26 @@ struct GRPCStreamStateMachine { } } - /// The result of requesting the next outbound message. - enum OnNextOutboundMessage: Equatable { - /// Either the receiving party is closed, so we shouldn't send any more messages; or the sender is done + /// The result of requesting the next outbound frame, which may contain multiple messages. + enum OnNextOutboundFrame { + /// Either the receiving party is closed, so we shouldn't send any more frames; or the sender is done /// writing messages (i.e. we are now closed). case noMoreMessages - /// There isn't a message ready to be sent, but we could still receive more, so keep trying. + /// There isn't a frame ready to be sent, but we could still receive more messages, so keep trying. case awaitMoreMessages - /// A message is ready to be sent. - case sendMessage(ByteBuffer) + /// A frame is ready to be sent. + case sendFrame( + frame: ByteBuffer, + promise: EventLoopPromise? + ) } - mutating func nextOutboundMessage() throws -> OnNextOutboundMessage { + mutating func nextOutboundFrame() throws -> OnNextOutboundFrame { switch self.configuration { case .client: - return try self.clientNextOutboundMessage() + return try self.clientNextOutboundFrame() case .server: - return try self.serverNextOutboundMessage() + return try self.serverNextOutboundFrame() } } @@ -540,15 +543,15 @@ extension GRPCStreamStateMachine { } } - private mutating func clientSend(message: [UInt8]) throws { + private mutating func clientSend(message: [UInt8], promise: EventLoopPromise?) throws { switch self.state { case .clientIdleServerIdle: try self.invalidState("Client not yet open.") case .clientOpenServerIdle(var state): - state.framer.append(message) + state.framer.append(message, promise: promise) self.state = .clientOpenServerIdle(state) case .clientOpenServerOpen(var state): - state.framer.append(message) + state.framer.append(message, promise: promise) self.state = .clientOpenServerOpen(state) case .clientOpenServerClosed: // The server has closed, so it makes no sense to send the rest of the request. @@ -577,23 +580,25 @@ extension GRPCStreamStateMachine { /// Returns the client's next request to the server. /// - Returns: The request to be made to the server. - private mutating func clientNextOutboundMessage() throws -> OnNextOutboundMessage { + private mutating func clientNextOutboundFrame() throws -> OnNextOutboundFrame { switch self.state { case .clientIdleServerIdle: try self.invalidState("Client is not open yet.") case .clientOpenServerIdle(var state): let request = try state.framer.next(compressor: state.compressor) self.state = .clientOpenServerIdle(state) - return request.map { .sendMessage($0) } ?? .awaitMoreMessages + return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) } + ?? .awaitMoreMessages case .clientOpenServerOpen(var state): let request = try state.framer.next(compressor: state.compressor) self.state = .clientOpenServerOpen(state) - return request.map { .sendMessage($0) } ?? .awaitMoreMessages + return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) } + ?? .awaitMoreMessages case .clientClosedServerIdle(var state): let request = try state.framer.next(compressor: state.compressor) self.state = .clientClosedServerIdle(state) if let request { - return .sendMessage(request) + return .sendFrame(frame: request.bytes, promise: request.promise) } else { return .noMoreMessages } @@ -601,7 +606,7 @@ extension GRPCStreamStateMachine { let request = try state.framer.next(compressor: state.compressor) self.state = .clientClosedServerOpen(state) if let request { - return .sendMessage(request) + return .sendFrame(frame: request.bytes, promise: request.promise) } else { return .noMoreMessages } @@ -1003,17 +1008,17 @@ extension GRPCStreamStateMachine { } } - private mutating func serverSend(message: [UInt8]) throws { + private mutating func serverSend(message: [UInt8], promise: EventLoopPromise?) throws { switch self.state { case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle: try self.invalidState( "Server must have sent initial metadata before sending a message." ) case .clientOpenServerOpen(var state): - state.framer.append(message) + state.framer.append(message, promise: promise) self.state = .clientOpenServerOpen(state) case .clientClosedServerOpen(var state): - state.framer.append(message) + state.framer.append(message, promise: promise) self.state = .clientClosedServerOpen(state) case .clientOpenServerClosed, .clientClosedServerClosed: try self.invalidState( @@ -1351,23 +1356,25 @@ extension GRPCStreamStateMachine { } } - private mutating func serverNextOutboundMessage() throws -> OnNextOutboundMessage { + private mutating func serverNextOutboundFrame() throws -> OnNextOutboundFrame { switch self.state { case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle: try self.invalidState("Server is not open yet.") case .clientOpenServerOpen(var state): let response = try state.framer.next(compressor: state.compressor) self.state = .clientOpenServerOpen(state) - return response.map { .sendMessage($0) } ?? .awaitMoreMessages + return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) } + ?? .awaitMoreMessages case .clientClosedServerOpen(var state): let response = try state.framer.next(compressor: state.compressor) self.state = .clientClosedServerOpen(state) - return response.map { .sendMessage($0) } ?? .awaitMoreMessages + return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) } + ?? .awaitMoreMessages case .clientOpenServerClosed(var state): let response = try state.framer?.next(compressor: state.compressor) self.state = .clientOpenServerClosed(state) if let response { - return .sendMessage(response) + return .sendFrame(frame: response.bytes, promise: response.promise) } else { return .noMoreMessages } @@ -1375,7 +1382,7 @@ extension GRPCStreamStateMachine { let response = try state.framer?.next(compressor: state.compressor) self.state = .clientClosedServerClosed(state) if let response { - return .sendMessage(response) + return .sendFrame(frame: response.bytes, promise: response.promise) } else { return .noMoreMessages } diff --git a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift index 22cd50330..14c576c20 100644 --- a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift @@ -34,7 +34,8 @@ final class GRPCServerStreamHandler: ChannelDuplexHandler { // We buffer the final status + trailers to avoid reordering issues (i.e., // if there are messages still not written into the channel because flush has // not been called, but the server sends back trailers). - private var pendingTrailers: HTTP2Frame.FramePayload? + private var pendingTrailers: + (trailers: HTTP2Frame.FramePayload, promise: EventLoopPromise?)? init( scheme: Scheme, @@ -142,37 +143,28 @@ extension GRPCServerStreamHandler { do { self.flushPending = true let headers = try self.stateMachine.send(metadata: metadata) - context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: nil) - // TODO: move the promise handling into the state machine - promise?.succeed() + context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: promise) } catch { - context.fireErrorCaught(error) - // TODO: move the promise handling into the state machine promise?.fail(error) + context.fireErrorCaught(error) } case .message(let message): do { - try self.stateMachine.send(message: message) - // TODO: move the promise handling into the state machine - promise?.succeed() + try self.stateMachine.send(message: message, promise: promise) } catch { - context.fireErrorCaught(error) - // TODO: move the promise handling into the state machine promise?.fail(error) + context.fireErrorCaught(error) } case .status(let status, let metadata): do { let headers = try self.stateMachine.send(status: status, metadata: metadata) let response = HTTP2Frame.FramePayload.headers(.init(headers: headers, endStream: true)) - self.pendingTrailers = response - // TODO: move the promise handling into the state machine - promise?.succeed() + self.pendingTrailers = (response, promise) } catch { - context.fireErrorCaught(error) - // TODO: move the promise handling into the state machine promise?.fail(error) + context.fireErrorCaught(error) } } } @@ -185,19 +177,22 @@ extension GRPCServerStreamHandler { do { loop: while true { - switch try self.stateMachine.nextOutboundMessage() { - case .sendMessage(let byteBuffer): + switch try self.stateMachine.nextOutboundFrame() { + case .sendFrame(let byteBuffer, let promise): self.flushPending = true context.write( self.wrapOutboundOut(.data(.init(data: .byteBuffer(byteBuffer)))), - promise: nil + promise: promise ) case .noMoreMessages: if let pendingTrailers = self.pendingTrailers { self.flushPending = true self.pendingTrailers = nil - context.write(self.wrapOutboundOut(pendingTrailers), promise: nil) + context.write( + self.wrapOutboundOut(pendingTrailers.trailers), + promise: pendingTrailers.promise + ) } break loop diff --git a/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift index 98d5fc046..103c89f70 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift @@ -129,19 +129,19 @@ final class GRPCMessageDeframerTests: XCTestCase { } let firstMessage = try { - framer.append(Array(repeating: 42, count: 100)) + framer.append(Array(repeating: 42, count: 100), promise: nil) return try framer.next(compressor: compressor)! }() let secondMessage = try { - framer.append(Array(repeating: 43, count: 110)) + framer.append(Array(repeating: 43, count: 110), promise: nil) return try framer.next(compressor: compressor)! }() try ByteToMessageDecoderVerifier.verifyDecoder( inputOutputPairs: [ - (firstMessage, [Array(repeating: 42, count: 100)]), - (secondMessage, [Array(repeating: 43, count: 110)]), + (firstMessage.bytes, [Array(repeating: 42, count: 100)]), + (secondMessage.bytes, [Array(repeating: 43, count: 110)]), ]) { GRPCMessageDeframer(maximumPayloadSize: 1000, decompressor: decompressor) } @@ -164,12 +164,12 @@ final class GRPCMessageDeframerTests: XCTestCase { compressor.end() } - framer.append(Array(repeating: 42, count: 100)) + framer.append(Array(repeating: 42, count: 100), promise: nil) let framedMessage = try framer.next(compressor: compressor)! XCTAssertThrowsError( ofType: RPCError.self, - try processor.process(buffer: framedMessage) { _ in + try processor.process(buffer: framedMessage.bytes) { _ in XCTFail("No message should be produced.") } ) { error in @@ -178,7 +178,7 @@ final class GRPCMessageDeframerTests: XCTestCase { error.message, """ Message has exceeded the configured maximum payload size \ - (max: 1, actual: \(framedMessage.readableBytes - GRPCMessageDeframer.metadataLength)) + (max: 1, actual: \(framedMessage.bytes.readableBytes - GRPCMessageDeframer.metadataLength)) """ ) } @@ -195,12 +195,12 @@ final class GRPCMessageDeframerTests: XCTestCase { compressor.end() } - framer.append(Array(repeating: 42, count: 101)) + framer.append(Array(repeating: 42, count: 101), promise: nil) let framedMessage = try framer.next(compressor: compressor)! XCTAssertThrowsError( ofType: RPCError.self, - try processor.process(buffer: framedMessage) { _ in + try processor.process(buffer: framedMessage.bytes) { _ in XCTFail("No message should be produced.") } ) { error in diff --git a/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift index 886e60319..bf9696e73 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift @@ -15,6 +15,7 @@ */ import NIOCore +import NIOEmbedded import XCTest @testable import GRPCHTTP2Core @@ -22,9 +23,9 @@ import XCTest final class GRPCMessageFramerTests: XCTestCase { func testSingleWrite() throws { var framer = GRPCMessageFramer() - framer.append(Array(repeating: 42, count: 128)) + framer.append(Array(repeating: 42, count: 128), promise: nil) - var buffer = try XCTUnwrap(framer.next()) + var buffer = try XCTUnwrap(framer.next()).bytes let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader()) XCTAssertFalse(compressed) XCTAssertEqual(length, 128) @@ -43,7 +44,7 @@ final class GRPCMessageFramerTests: XCTestCase { var framer = GRPCMessageFramer() let message = [UInt8](repeating: 42, count: 128) - framer.append(message) + framer.append(message, promise: nil) var buffer = ByteBuffer() let testCompressor = Zlib.Compressor(method: compressionMethod) @@ -53,7 +54,7 @@ final class GRPCMessageFramerTests: XCTestCase { testCompressor.end() } - buffer = try XCTUnwrap(framer.next(compressor: compressor)) + buffer = try XCTUnwrap(framer.next(compressor: compressor)).bytes let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader()) XCTAssertTrue(compressed) XCTAssertEqual(length, UInt32(compressedSize)) @@ -74,26 +75,47 @@ final class GRPCMessageFramerTests: XCTestCase { func testMultipleWrites() throws { var framer = GRPCMessageFramer() - - let messages = 100 - for _ in 0 ..< messages { - framer.append(Array(repeating: 42, count: 128)) + let eventLoop = EmbeddedEventLoop() + + // Create 100 messages and link a different promise with each of them. + let messagesCount = 100 + var promises = [EventLoopPromise]() + promises.reserveCapacity(messagesCount) + for _ in 0 ..< messagesCount { + let promise = eventLoop.makePromise(of: Void.self) + promises.append(promise) + framer.append(Array(repeating: 42, count: 128), promise: promise) } - var buffer = try XCTUnwrap(framer.next()) - for _ in 0 ..< messages { + let nextFrame = try XCTUnwrap(framer.next()) + + // Assert the messages have been framed all together in the same frame. + var buffer = nextFrame.bytes + for _ in 0 ..< messagesCount { let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader()) XCTAssertFalse(compressed) XCTAssertEqual(length, 128) XCTAssertEqual(buffer.readSlice(length: Int(length)), ByteBuffer(repeating: 42, count: 128)) } - XCTAssertEqual(buffer.readableBytes, 0) - // No more bufers. + // Assert the promise returned from the framer is the promise linked to the + // first message appended to the framer. + let returnedPromise = nextFrame.promise + XCTAssertEqual(returnedPromise?.futureResult, promises.first?.futureResult) + + // Succeed the returned promise to simulate a write into the channel + // succeeding, and assert that all other promises have been chained and are + // also succeeded as a result. + returnedPromise?.succeed() + XCTAssertEqual(promises.count, messagesCount) + for promise in promises { + try promise.futureResult.assertSuccess().wait() + } + + // No more frames. XCTAssertNil(try framer.next()) } - } extension ByteBuffer { diff --git a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift index 29261257f..324d47fdb 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift @@ -16,6 +16,7 @@ import GRPCCore import NIOCore +import NIOEmbedded import NIOHPACK import XCTest @@ -238,7 +239,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Try to send a message without opening (i.e. without sending initial metadata) XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client not yet open.") @@ -252,7 +253,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { var stateMachine = self.makeClientStateMachine(targetState: targetState) // Now send a message - XCTAssertNoThrow(try stateMachine.send(message: [])) + XCTAssertNoThrow(try stateMachine.send(message: [], promise: nil)) } } @@ -266,7 +267,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Try sending another message: it should fail XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client is closed, cannot send a message.") @@ -624,7 +625,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.nextOutboundMessage() + try stateMachine.nextOutboundFrame() ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client is not open yet.") @@ -635,9 +636,9 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { for targetState in [TargetStateMachineState.clientOpenServerIdle, .clientOpenServerOpen] { var stateMachine = self.makeClientStateMachine(targetState: targetState) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) let expectedBytes: [UInt8] = [ 0, // compression flag: unset @@ -645,12 +646,12 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 42, 42, // original message ] XCTAssertEqual( - try stateMachine.nextOutboundMessage(), - .sendMessage(ByteBuffer(bytes: expectedBytes)) + try stateMachine.nextOutboundFrame(), + .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil) ) // And then make sure that nothing else is returned anymore - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) } } @@ -660,14 +661,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { compressionEnabled: true ) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) let originalMessage = [UInt8]([42, 42, 43, 43]) - XCTAssertNoThrow(try stateMachine.send(message: originalMessage)) + XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil)) - let request = try stateMachine.nextOutboundMessage() + let request = try stateMachine.nextOutboundFrame() let framedMessage = try self.frameMessage(originalMessage, compress: true) - XCTAssertEqual(request, .sendMessage(framedMessage)) + XCTAssertEqual(request, .sendFrame(frame: framedMessage, promise: nil)) } func testNextOutboundMessageWhenClientOpenAndServerOpen_WithCompression() throws { @@ -676,74 +677,74 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { compressionEnabled: true ) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) let originalMessage = [UInt8]([42, 42, 43, 43]) - XCTAssertNoThrow(try stateMachine.send(message: originalMessage)) + XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil)) - let request = try stateMachine.nextOutboundMessage() + let request = try stateMachine.nextOutboundFrame() let framedMessage = try self.frameMessage(originalMessage, compress: true) - XCTAssertEqual(request, .sendMessage(framedMessage)) + XCTAssertEqual(request, .sendFrame(frame: framedMessage, promise: nil)) } func testNextOutboundMessageWhenClientOpenAndServerClosed() throws { var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerClosed) // No more messages to send - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) // Queue a message, but assert the action is .noMoreMessages nevertheless, // because the server is closed. - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) } func testNextOutboundMessageWhenClientClosedAndServerIdle() throws { var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerIdle) // Send a message and close client - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) XCTAssertNoThrow(try stateMachine.closeOutbound()) // Make sure that getting the next outbound message _does_ return the message // we have enqueued. - let request = try stateMachine.nextOutboundMessage() + let request = try stateMachine.nextOutboundFrame() let expectedBytes: [UInt8] = [ 0, // compression flag: unset 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ] - XCTAssertEqual(request, .sendMessage(ByteBuffer(bytes: expectedBytes))) + XCTAssertEqual(request, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned anymore - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) } func testNextOutboundMessageWhenClientClosedAndServerOpen() throws { var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen) // Send a message and close client - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) XCTAssertNoThrow(try stateMachine.closeOutbound()) // Make sure that getting the next outbound message _does_ return the message // we have enqueued. - let request = try stateMachine.nextOutboundMessage() + let request = try stateMachine.nextOutboundFrame() let expectedBytes: [UInt8] = [ 0, // compression flag: unset 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ] - XCTAssertEqual(request, .sendMessage(ByteBuffer(bytes: expectedBytes))) + XCTAssertEqual(request, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned anymore - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) } func testNextOutboundMessageWhenClientClosedAndServerClosed() throws { var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen) // Send a message - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) // Close server XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) @@ -753,7 +754,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Even though we have enqueued a message, don't send it, because the server // is closed. - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) } // - MARK: Next inbound message @@ -887,13 +888,16 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { ) // Client sends messages - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) let message = [UInt8]([1, 2, 3, 4]) let framedMessage = try self.frameMessage(message, compress: false) - try stateMachine.send(message: message) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage)) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + try stateMachine.send(message: message, promise: nil) + XCTAssertEqual( + try stateMachine.nextOutboundFrame(), + .sendFrame(frame: framedMessage, promise: nil) + ) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) // Server sends response XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) @@ -929,7 +933,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { .receivedStatusAndMetadata(status: .init(code: .ok, message: ""), metadata: receivedMetadata) ) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } @@ -951,14 +955,17 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { ) // Client sends messages and ends - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) let message = [UInt8]([1, 2, 3, 4]) let framedMessage = try self.frameMessage(message, compress: false) - XCTAssertNoThrow(try stateMachine.send(message: message)) + XCTAssertNoThrow(try stateMachine.send(message: message, promise: nil)) XCTAssertNoThrow(try stateMachine.closeOutbound()) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage)) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual( + try stateMachine.nextOutboundFrame(), + .sendFrame(frame: framedMessage, promise: nil) + ) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) // Server sends initial metadata let serverInitialHeadersAction = try stateMachine.receive( @@ -1005,7 +1012,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { .receivedStatusAndMetadata(status: .init(code: .ok, message: ""), metadata: receivedMetadata) ) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } @@ -1027,13 +1034,16 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { ) // Client sends messages - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) let message = [UInt8]([1, 2, 3, 4]) let framedMessage = try self.frameMessage(message, compress: false) - try stateMachine.send(message: message) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage)) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + try stateMachine.send(message: message, promise: nil) + XCTAssertEqual( + try stateMachine.nextOutboundFrame(), + .sendFrame(frame: framedMessage, promise: nil) + ) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) // Server sends initial metadata let serverInitialHeadersAction = try stateMachine.receive( @@ -1083,7 +1093,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { .receivedStatusAndMetadata(status: .init(code: .ok, message: ""), metadata: receivedMetadata) ) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } } @@ -1242,7 +1252,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual( @@ -1258,7 +1268,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Now send a message XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual( @@ -1272,7 +1282,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) // Now send a message - XCTAssertNoThrow(try stateMachine.send(message: [])) + XCTAssertNoThrow(try stateMachine.send(message: [], promise: nil)) } func testSendMessageWhenClientOpenAndServerClosed() { @@ -1281,7 +1291,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1293,7 +1303,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual( @@ -1308,7 +1318,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending a message: even though client is closed, we should send it // because it may be expecting a response. - XCTAssertNoThrow(try stateMachine.send(message: [])) + XCTAssertNoThrow(try stateMachine.send(message: [], promise: nil)) } func testSendMessageWhenClientClosedAndServerClosed() { @@ -1317,7 +1327,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1363,7 +1373,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1385,7 +1395,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1429,7 +1439,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1451,7 +1461,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1873,7 +1883,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.nextOutboundMessage() + try stateMachine.nextOutboundFrame() ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server is not open yet.") @@ -1885,7 +1895,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.nextOutboundMessage() + try stateMachine.nextOutboundFrame() ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server is not open yet.") @@ -1897,7 +1907,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.nextOutboundMessage() + try stateMachine.nextOutboundFrame() ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server is not open yet.") @@ -1907,20 +1917,20 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { func testNextOutboundMessageWhenClientOpenAndServerOpen() throws { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) - let response = try stateMachine.nextOutboundMessage() + let response = try stateMachine.nextOutboundFrame() let expectedBytes: [UInt8] = [ 0, // compression flag: unset 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ] - XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes))) + XCTAssertEqual(response, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) } func testNextOutboundMessageWhenClientOpenAndServerOpen_WithCompression() throws { @@ -1929,21 +1939,21 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { compressionEnabled: true ) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) let originalMessage = [UInt8]([42, 42, 43, 43]) - XCTAssertNoThrow(try stateMachine.send(message: originalMessage)) + XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil)) - let response = try stateMachine.nextOutboundMessage() + let response = try stateMachine.nextOutboundFrame() let framedMessage = try self.frameMessage(originalMessage, compress: true) - XCTAssertEqual(response, .sendMessage(framedMessage)) + XCTAssertEqual(response, .sendFrame(frame: framedMessage, promise: nil)) } func testNextOutboundMessageWhenClientOpenAndServerClosed() throws { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) // Send message and close server - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) XCTAssertNoThrow( try stateMachine.send( status: .init(code: .ok, message: ""), @@ -1951,16 +1961,16 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) ) - let response = try stateMachine.nextOutboundMessage() + let response = try stateMachine.nextOutboundFrame() let expectedBytes: [UInt8] = [ 0, // compression flag: unset 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ] - XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes))) + XCTAssertEqual(response, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned anymore - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) } func testNextOutboundMessageWhenClientClosedAndServerIdle() throws { @@ -1968,7 +1978,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.nextOutboundMessage() + try stateMachine.nextOutboundFrame() ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server is not open yet.") @@ -1979,17 +1989,17 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) // Send a message - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) // Close client XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) // Send another message - XCTAssertNoThrow(try stateMachine.send(message: [43, 43])) + XCTAssertNoThrow(try stateMachine.send(message: [43, 43], promise: nil)) // Make sure that getting the next outbound message _does_ return the message // we have enqueued. - let response = try stateMachine.nextOutboundMessage() + let response = try stateMachine.nextOutboundFrame() let expectedBytes: [UInt8] = [ 0, // compression flag: unset 0, 0, 0, 2, // message length: 2 bytes @@ -1999,17 +2009,17 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 43, 43, // original message ] - XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes))) + XCTAssertEqual(response, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned anymore - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) } func testNextOutboundMessageWhenClientClosedAndServerClosed() throws { var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen) // Send a message and close server - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) XCTAssertNoThrow( try stateMachine.send( status: .init(code: .ok, message: ""), @@ -2019,16 +2029,16 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // We have enqueued a message, make sure we return it even though server is closed, // because we haven't yet drained all of the pending messages. - let response = try stateMachine.nextOutboundMessage() + let response = try stateMachine.nextOutboundFrame() let expectedBytes: [UInt8] = [ 0, // compression flag: unset 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ] - XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes))) + XCTAssertEqual(response, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned anymore - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) } // - MARK: Next inbound message @@ -2185,15 +2195,35 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage)) // Server sends response + let eventLoop = EmbeddedEventLoop() + let firstPromise = eventLoop.makePromise(of: Void.self) + let secondPromise = eventLoop.makePromise(of: Void.self) + let firstResponse = [UInt8]([5, 6, 7]) let secondResponse = [UInt8]([8, 9, 10]) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) - try stateMachine.send(message: firstResponse) - try stateMachine.send(message: secondResponse) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) + + try stateMachine.send(message: firstResponse, promise: firstPromise) + try stateMachine.send(message: secondResponse, promise: secondPromise) // Make sure messages are outbound let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages)) + + guard + case .sendFrame(let nextOutboundByteBuffer, let nextOutboundPromise) = + try stateMachine.nextOutboundFrame() + else { + XCTFail("Should have received .sendMessage") + return + } + XCTAssertEqual(nextOutboundByteBuffer, framedMessages) + XCTAssertTrue(firstPromise.futureResult === nextOutboundPromise?.futureResult) + + // Make sure that the promises associated with each sent message are chained + // together: when succeeding the one returned by the state machine on + // `nextOutboundMessage()`, the others should also be succeeded. + firstPromise.succeed() + try secondPromise.futureResult.assertSuccess().wait() // Client sends end try stateMachine.receive(buffer: ByteBuffer(), endStream: true) @@ -2205,7 +2235,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) XCTAssertEqual(response, ["grpc-status": "0"]) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } @@ -2252,13 +2282,16 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Server sends response let firstResponse = [UInt8]([5, 6, 7]) let secondResponse = [UInt8]([8, 9, 10]) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) - try stateMachine.send(message: firstResponse) - try stateMachine.send(message: secondResponse) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) + try stateMachine.send(message: firstResponse, promise: nil) + try stateMachine.send(message: secondResponse, promise: nil) // Make sure messages are outbound let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages)) + XCTAssertEqual( + try stateMachine.nextOutboundFrame(), + .sendFrame(frame: framedMessages, promise: nil) + ) // Server ends let response = try stateMachine.send( @@ -2267,7 +2300,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) XCTAssertEqual(response, ["grpc-status": "0"]) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } @@ -2314,13 +2347,16 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Server sends response let firstResponse = [UInt8]([5, 6, 7]) let secondResponse = [UInt8]([8, 9, 10]) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) - try stateMachine.send(message: firstResponse) - try stateMachine.send(message: secondResponse) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) + try stateMachine.send(message: firstResponse, promise: nil) + try stateMachine.send(message: secondResponse, promise: nil) // Make sure messages are outbound let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages)) + XCTAssertEqual( + try stateMachine.nextOutboundFrame(), + .sendFrame(frame: framedMessages, promise: nil) + ) // Server ends let response = try stateMachine.send( @@ -2329,7 +2365,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) XCTAssertEqual(response, ["grpc-status": "0"]) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } } @@ -2362,8 +2398,29 @@ extension XCTestCase { }() defer { compressor?.end() } for message in messages { - framer.append(message) + framer.append(message, promise: nil) + } + return try XCTUnwrap(framer.next(compressor: compressor)).bytes + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension GRPCStreamStateMachine.OnNextOutboundFrame: Equatable { + public static func == ( + lhs: GRPCStreamStateMachine.OnNextOutboundFrame, + rhs: GRPCStreamStateMachine.OnNextOutboundFrame + ) -> Bool { + switch (lhs, rhs) { + case (.noMoreMessages, .noMoreMessages): + return true + case (.awaitMoreMessages, .awaitMoreMessages): + return true + case (.sendFrame(let lhsMessage, _), .sendFrame(let rhsMessage, _)): + // Note that we're not comparing the EventLoopPromises here, as they're + // not Equatable. This is fine though, since we only use this in tests. + return lhsMessage == rhsMessage + default: + return false } - return try XCTUnwrap(framer.next(compressor: compressor)) } }