From 30d51fff49c69d1aee83d1f30a5e15824e4bf94e Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Tue, 26 Mar 2024 15:23:48 +0000 Subject: [PATCH 01/10] Handle promises --- Sources/GRPCHTTP2Core/GRPCMessageFramer.swift | 25 ++- .../GRPCStreamStateMachine.swift | 45 +++--- .../Server/GRPCServerStreamHandler.swift | 28 ++-- .../GRPCMessageDeframerTests.swift | 18 +-- .../GRPCMessageFramerTests.swift | 12 +- .../GRPCStreamStateMachineTests.swift | 147 +++++++++++------- 6 files changed, 163 insertions(+), 112 deletions(-) diff --git a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift index 590b5efeb..2be7a9035 100644 --- a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift +++ b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift @@ -32,7 +32,7 @@ struct GRPCMessageFramer { /// reserves capacity in powers of 2. This way, we can take advantage of the whole buffer. static let maximumWriteBufferLength = 65_536 - private var pendingMessages: OneOrManyQueue<[UInt8]> + private var pendingMessages: OneOrManyQueue<(bytes: [UInt8], promise: EventLoopPromise?)> private var writeBuffer: ByteBuffer @@ -44,8 +44,13 @@ struct GRPCMessageFramer { /// Queue the given bytes to be framed and potentially coalesced alongside other messages in a `ByteBuffer`. /// The resulting data will be returned when calling ``GRPCMessageFramer/next()``. - mutating func append(_ bytes: [UInt8]) { - self.pendingMessages.append(bytes) + mutating func append(_ bytes: [UInt8], promise: EventLoopPromise?) { + self.pendingMessages.append((bytes, promise)) + } + + struct DataFrame { + let bytes: ByteBuffer + let promise: EventLoopPromise? } /// If there are pending messages to be framed, a `ByteBuffer` will be returned with the framed data. @@ -53,7 +58,7 @@ struct GRPCMessageFramer { /// - Parameter compressor: An optional compressor: if present, payloads will be compressed; otherwise /// they'll be framed as-is. /// - Throws: If an error is encountered, such as a compression failure, an error will be thrown. - mutating func next(compressor: Zlib.Compressor? = nil) throws -> ByteBuffer? { + mutating func next(compressor: Zlib.Compressor? = nil) throws -> DataFrame? { if self.pendingMessages.isEmpty { // Nothing pending: exit early. return nil @@ -69,15 +74,21 @@ struct GRPCMessageFramer { var requiredCapacity = 0 for message in self.pendingMessages { - requiredCapacity += message.count + Self.metadataLength + requiredCapacity += message.bytes.count + Self.metadataLength } self.writeBuffer.clear(minimumCapacity: requiredCapacity) + var pendingWritePromise: EventLoopPromise? while let message = self.pendingMessages.pop() { - try self.encode(message, compressor: compressor) + try self.encode(message.bytes, compressor: compressor) + if pendingWritePromise == nil { + pendingWritePromise = message.promise + } else { + pendingWritePromise?.futureResult.cascade(to: message.promise) + } } - return self.writeBuffer + return DataFrame(bytes: self.writeBuffer, promise: pendingWritePromise) } private mutating func encode(_ message: [UInt8], compressor: Zlib.Compressor?) throws { diff --git a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift index 41961b568..883c96dfa 100644 --- a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift +++ b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift @@ -325,12 +325,12 @@ struct GRPCStreamStateMachine { } } - mutating func send(message: [UInt8]) throws { + mutating func send(message: [UInt8], promise: EventLoopPromise?) throws { switch self.configuration { case .client: - try self.clientSend(message: message) + try self.clientSend(message: message, promise: promise) case .server: - try self.serverSend(message: message) + try self.serverSend(message: message, promise: promise) } } @@ -398,14 +398,17 @@ struct GRPCStreamStateMachine { } /// The result of requesting the next outbound message. - enum OnNextOutboundMessage: Equatable { + enum OnNextOutboundMessage { /// Either the receiving party is closed, so we shouldn't send any more messages; or the sender is done /// writing messages (i.e. we are now closed). case noMoreMessages /// There isn't a message ready to be sent, but we could still receive more, so keep trying. case awaitMoreMessages /// A message is ready to be sent. - case sendMessage(ByteBuffer) + case sendMessage( + message: ByteBuffer, + promise: EventLoopPromise? + ) } mutating func nextOutboundMessage() throws -> OnNextOutboundMessage { @@ -540,15 +543,15 @@ extension GRPCStreamStateMachine { } } - private mutating func clientSend(message: [UInt8]) throws { + private mutating func clientSend(message: [UInt8], promise: EventLoopPromise?) throws { switch self.state { case .clientIdleServerIdle: try self.invalidState("Client not yet open.") case .clientOpenServerIdle(var state): - state.framer.append(message) + state.framer.append(message, promise: promise) self.state = .clientOpenServerIdle(state) case .clientOpenServerOpen(var state): - state.framer.append(message) + state.framer.append(message, promise: promise) self.state = .clientOpenServerOpen(state) case .clientOpenServerClosed: // The server has closed, so it makes no sense to send the rest of the request. @@ -584,16 +587,18 @@ extension GRPCStreamStateMachine { case .clientOpenServerIdle(var state): let request = try state.framer.next(compressor: state.compressor) self.state = .clientOpenServerIdle(state) - return request.map { .sendMessage($0) } ?? .awaitMoreMessages + return request.map { .sendMessage(message: $0.bytes, promise: $0.promise) } + ?? .awaitMoreMessages case .clientOpenServerOpen(var state): let request = try state.framer.next(compressor: state.compressor) self.state = .clientOpenServerOpen(state) - return request.map { .sendMessage($0) } ?? .awaitMoreMessages + return request.map { .sendMessage(message: $0.bytes, promise: $0.promise) } + ?? .awaitMoreMessages case .clientClosedServerIdle(var state): let request = try state.framer.next(compressor: state.compressor) self.state = .clientClosedServerIdle(state) if let request { - return .sendMessage(request) + return .sendMessage(message: request.bytes, promise: request.promise) } else { return .noMoreMessages } @@ -601,7 +606,7 @@ extension GRPCStreamStateMachine { let request = try state.framer.next(compressor: state.compressor) self.state = .clientClosedServerOpen(state) if let request { - return .sendMessage(request) + return .sendMessage(message: request.bytes, promise: request.promise) } else { return .noMoreMessages } @@ -1003,17 +1008,17 @@ extension GRPCStreamStateMachine { } } - private mutating func serverSend(message: [UInt8]) throws { + private mutating func serverSend(message: [UInt8], promise: EventLoopPromise?) throws { switch self.state { case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle: try self.invalidState( "Server must have sent initial metadata before sending a message." ) case .clientOpenServerOpen(var state): - state.framer.append(message) + state.framer.append(message, promise: promise) self.state = .clientOpenServerOpen(state) case .clientClosedServerOpen(var state): - state.framer.append(message) + state.framer.append(message, promise: promise) self.state = .clientClosedServerOpen(state) case .clientOpenServerClosed, .clientClosedServerClosed: try self.invalidState( @@ -1358,16 +1363,18 @@ extension GRPCStreamStateMachine { case .clientOpenServerOpen(var state): let response = try state.framer.next(compressor: state.compressor) self.state = .clientOpenServerOpen(state) - return response.map { .sendMessage($0) } ?? .awaitMoreMessages + return response.map { .sendMessage(message: $0.bytes, promise: $0.promise) } + ?? .awaitMoreMessages case .clientClosedServerOpen(var state): let response = try state.framer.next(compressor: state.compressor) self.state = .clientClosedServerOpen(state) - return response.map { .sendMessage($0) } ?? .awaitMoreMessages + return response.map { .sendMessage(message: $0.bytes, promise: $0.promise) } + ?? .awaitMoreMessages case .clientOpenServerClosed(var state): let response = try state.framer?.next(compressor: state.compressor) self.state = .clientOpenServerClosed(state) if let response { - return .sendMessage(response) + return .sendMessage(message: response.bytes, promise: response.promise) } else { return .noMoreMessages } @@ -1375,7 +1382,7 @@ extension GRPCStreamStateMachine { let response = try state.framer?.next(compressor: state.compressor) self.state = .clientClosedServerClosed(state) if let response { - return .sendMessage(response) + return .sendMessage(message: response.bytes, promise: response.promise) } else { return .noMoreMessages } diff --git a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift index 22cd50330..c0e8e63d1 100644 --- a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift @@ -35,6 +35,7 @@ final class GRPCServerStreamHandler: ChannelDuplexHandler { // if there are messages still not written into the channel because flush has // not been called, but the server sends back trailers). private var pendingTrailers: HTTP2Frame.FramePayload? + private var pendingTrailersPromise: EventLoopPromise? init( scheme: Scheme, @@ -142,24 +143,17 @@ extension GRPCServerStreamHandler { do { self.flushPending = true let headers = try self.stateMachine.send(metadata: metadata) - context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: nil) - // TODO: move the promise handling into the state machine - promise?.succeed() + context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: promise) } catch { context.fireErrorCaught(error) - // TODO: move the promise handling into the state machine promise?.fail(error) } case .message(let message): do { - try self.stateMachine.send(message: message) - // TODO: move the promise handling into the state machine - promise?.succeed() + try self.stateMachine.send(message: message, promise: promise) } catch { context.fireErrorCaught(error) - // TODO: move the promise handling into the state machine - promise?.fail(error) } case .status(let status, let metadata): @@ -167,12 +161,9 @@ extension GRPCServerStreamHandler { let headers = try self.stateMachine.send(status: status, metadata: metadata) let response = HTTP2Frame.FramePayload.headers(.init(headers: headers, endStream: true)) self.pendingTrailers = response - // TODO: move the promise handling into the state machine - promise?.succeed() + self.pendingTrailersPromise = promise } catch { context.fireErrorCaught(error) - // TODO: move the promise handling into the state machine - promise?.fail(error) } } } @@ -186,18 +177,21 @@ extension GRPCServerStreamHandler { do { loop: while true { switch try self.stateMachine.nextOutboundMessage() { - case .sendMessage(let byteBuffer): + case .sendMessage(let byteBuffer, let promise): self.flushPending = true context.write( self.wrapOutboundOut(.data(.init(data: .byteBuffer(byteBuffer)))), - promise: nil + promise: promise ) case .noMoreMessages: - if let pendingTrailers = self.pendingTrailers { + if let pendingTrailers = self.pendingTrailers, + let pendingTrailersPromise = self.pendingTrailersPromise + { self.flushPending = true self.pendingTrailers = nil - context.write(self.wrapOutboundOut(pendingTrailers), promise: nil) + self.pendingTrailersPromise = nil + context.write(self.wrapOutboundOut(pendingTrailers), promise: pendingTrailersPromise) } break loop diff --git a/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift index 98d5fc046..103c89f70 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift @@ -129,19 +129,19 @@ final class GRPCMessageDeframerTests: XCTestCase { } let firstMessage = try { - framer.append(Array(repeating: 42, count: 100)) + framer.append(Array(repeating: 42, count: 100), promise: nil) return try framer.next(compressor: compressor)! }() let secondMessage = try { - framer.append(Array(repeating: 43, count: 110)) + framer.append(Array(repeating: 43, count: 110), promise: nil) return try framer.next(compressor: compressor)! }() try ByteToMessageDecoderVerifier.verifyDecoder( inputOutputPairs: [ - (firstMessage, [Array(repeating: 42, count: 100)]), - (secondMessage, [Array(repeating: 43, count: 110)]), + (firstMessage.bytes, [Array(repeating: 42, count: 100)]), + (secondMessage.bytes, [Array(repeating: 43, count: 110)]), ]) { GRPCMessageDeframer(maximumPayloadSize: 1000, decompressor: decompressor) } @@ -164,12 +164,12 @@ final class GRPCMessageDeframerTests: XCTestCase { compressor.end() } - framer.append(Array(repeating: 42, count: 100)) + framer.append(Array(repeating: 42, count: 100), promise: nil) let framedMessage = try framer.next(compressor: compressor)! XCTAssertThrowsError( ofType: RPCError.self, - try processor.process(buffer: framedMessage) { _ in + try processor.process(buffer: framedMessage.bytes) { _ in XCTFail("No message should be produced.") } ) { error in @@ -178,7 +178,7 @@ final class GRPCMessageDeframerTests: XCTestCase { error.message, """ Message has exceeded the configured maximum payload size \ - (max: 1, actual: \(framedMessage.readableBytes - GRPCMessageDeframer.metadataLength)) + (max: 1, actual: \(framedMessage.bytes.readableBytes - GRPCMessageDeframer.metadataLength)) """ ) } @@ -195,12 +195,12 @@ final class GRPCMessageDeframerTests: XCTestCase { compressor.end() } - framer.append(Array(repeating: 42, count: 101)) + framer.append(Array(repeating: 42, count: 101), promise: nil) let framedMessage = try framer.next(compressor: compressor)! XCTAssertThrowsError( ofType: RPCError.self, - try processor.process(buffer: framedMessage) { _ in + try processor.process(buffer: framedMessage.bytes) { _ in XCTFail("No message should be produced.") } ) { error in diff --git a/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift index 886e60319..6d61af763 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift @@ -22,9 +22,9 @@ import XCTest final class GRPCMessageFramerTests: XCTestCase { func testSingleWrite() throws { var framer = GRPCMessageFramer() - framer.append(Array(repeating: 42, count: 128)) + framer.append(Array(repeating: 42, count: 128), promise: nil) - var buffer = try XCTUnwrap(framer.next()) + var buffer = try XCTUnwrap(framer.next()).bytes let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader()) XCTAssertFalse(compressed) XCTAssertEqual(length, 128) @@ -43,7 +43,7 @@ final class GRPCMessageFramerTests: XCTestCase { var framer = GRPCMessageFramer() let message = [UInt8](repeating: 42, count: 128) - framer.append(message) + framer.append(message, promise: nil) var buffer = ByteBuffer() let testCompressor = Zlib.Compressor(method: compressionMethod) @@ -53,7 +53,7 @@ final class GRPCMessageFramerTests: XCTestCase { testCompressor.end() } - buffer = try XCTUnwrap(framer.next(compressor: compressor)) + buffer = try XCTUnwrap(framer.next(compressor: compressor)).bytes let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader()) XCTAssertTrue(compressed) XCTAssertEqual(length, UInt32(compressedSize)) @@ -77,10 +77,10 @@ final class GRPCMessageFramerTests: XCTestCase { let messages = 100 for _ in 0 ..< messages { - framer.append(Array(repeating: 42, count: 128)) + framer.append(Array(repeating: 42, count: 128), promise: nil) } - var buffer = try XCTUnwrap(framer.next()) + var buffer = try XCTUnwrap(framer.next()).bytes for _ in 0 ..< messages { let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader()) XCTAssertFalse(compressed) diff --git a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift index 29261257f..543b8dadc 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift @@ -238,7 +238,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Try to send a message without opening (i.e. without sending initial metadata) XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client not yet open.") @@ -252,7 +252,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { var stateMachine = self.makeClientStateMachine(targetState: targetState) // Now send a message - XCTAssertNoThrow(try stateMachine.send(message: [])) + XCTAssertNoThrow(try stateMachine.send(message: [], promise: nil)) } } @@ -266,7 +266,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Try sending another message: it should fail XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client is closed, cannot send a message.") @@ -637,7 +637,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) let expectedBytes: [UInt8] = [ 0, // compression flag: unset @@ -646,7 +646,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { ] XCTAssertEqual( try stateMachine.nextOutboundMessage(), - .sendMessage(ByteBuffer(bytes: expectedBytes)) + .sendMessage(message: ByteBuffer(bytes: expectedBytes), promise: nil) ) // And then make sure that nothing else is returned anymore @@ -663,11 +663,11 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) let originalMessage = [UInt8]([42, 42, 43, 43]) - XCTAssertNoThrow(try stateMachine.send(message: originalMessage)) + XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil)) let request = try stateMachine.nextOutboundMessage() let framedMessage = try self.frameMessage(originalMessage, compress: true) - XCTAssertEqual(request, .sendMessage(framedMessage)) + XCTAssertEqual(request, .sendMessage(message: framedMessage, promise: nil)) } func testNextOutboundMessageWhenClientOpenAndServerOpen_WithCompression() throws { @@ -679,11 +679,11 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) let originalMessage = [UInt8]([42, 42, 43, 43]) - XCTAssertNoThrow(try stateMachine.send(message: originalMessage)) + XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil)) let request = try stateMachine.nextOutboundMessage() let framedMessage = try self.frameMessage(originalMessage, compress: true) - XCTAssertEqual(request, .sendMessage(framedMessage)) + XCTAssertEqual(request, .sendMessage(message: framedMessage, promise: nil)) } func testNextOutboundMessageWhenClientOpenAndServerClosed() throws { @@ -694,7 +694,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Queue a message, but assert the action is .noMoreMessages nevertheless, // because the server is closed. - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) } @@ -702,7 +702,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerIdle) // Send a message and close client - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) XCTAssertNoThrow(try stateMachine.closeOutbound()) // Make sure that getting the next outbound message _does_ return the message @@ -713,7 +713,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ] - XCTAssertEqual(request, .sendMessage(ByteBuffer(bytes: expectedBytes))) + XCTAssertEqual(request, .sendMessage(message: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned anymore XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) @@ -723,7 +723,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen) // Send a message and close client - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) XCTAssertNoThrow(try stateMachine.closeOutbound()) // Make sure that getting the next outbound message _does_ return the message @@ -734,7 +734,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ] - XCTAssertEqual(request, .sendMessage(ByteBuffer(bytes: expectedBytes))) + XCTAssertEqual(request, .sendMessage(message: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned anymore XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) @@ -743,7 +743,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { func testNextOutboundMessageWhenClientClosedAndServerClosed() throws { var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen) // Send a message - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) // Close server XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) @@ -891,8 +891,11 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let message = [UInt8]([1, 2, 3, 4]) let framedMessage = try self.frameMessage(message, compress: false) - try stateMachine.send(message: message) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage)) + try stateMachine.send(message: message, promise: nil) + XCTAssertEqual( + try stateMachine.nextOutboundMessage(), + .sendMessage(message: framedMessage, promise: nil) + ) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) // Server sends response @@ -955,9 +958,12 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let message = [UInt8]([1, 2, 3, 4]) let framedMessage = try self.frameMessage(message, compress: false) - XCTAssertNoThrow(try stateMachine.send(message: message)) + XCTAssertNoThrow(try stateMachine.send(message: message, promise: nil)) XCTAssertNoThrow(try stateMachine.closeOutbound()) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage)) + XCTAssertEqual( + try stateMachine.nextOutboundMessage(), + .sendMessage(message: framedMessage, promise: nil) + ) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) // Server sends initial metadata @@ -1031,8 +1037,11 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let message = [UInt8]([1, 2, 3, 4]) let framedMessage = try self.frameMessage(message, compress: false) - try stateMachine.send(message: message) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage)) + try stateMachine.send(message: message, promise: nil) + XCTAssertEqual( + try stateMachine.nextOutboundMessage(), + .sendMessage(message: framedMessage, promise: nil) + ) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) // Server sends initial metadata @@ -1242,7 +1251,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual( @@ -1258,7 +1267,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Now send a message XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual( @@ -1272,7 +1281,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) // Now send a message - XCTAssertNoThrow(try stateMachine.send(message: [])) + XCTAssertNoThrow(try stateMachine.send(message: [], promise: nil)) } func testSendMessageWhenClientOpenAndServerClosed() { @@ -1281,7 +1290,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1293,7 +1302,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual( @@ -1308,7 +1317,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending a message: even though client is closed, we should send it // because it may be expecting a response. - XCTAssertNoThrow(try stateMachine.send(message: [])) + XCTAssertNoThrow(try stateMachine.send(message: [], promise: nil)) } func testSendMessageWhenClientClosedAndServerClosed() { @@ -1317,7 +1326,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1363,7 +1372,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1385,7 +1394,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1429,7 +1438,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1451,7 +1460,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: []) + try stateMachine.send(message: [], promise: nil) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1909,7 +1918,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) let response = try stateMachine.nextOutboundMessage() let expectedBytes: [UInt8] = [ @@ -1917,7 +1926,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ] - XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes))) + XCTAssertEqual(response, .sendMessage(message: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) @@ -1932,18 +1941,18 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) let originalMessage = [UInt8]([42, 42, 43, 43]) - XCTAssertNoThrow(try stateMachine.send(message: originalMessage)) + XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil)) let response = try stateMachine.nextOutboundMessage() let framedMessage = try self.frameMessage(originalMessage, compress: true) - XCTAssertEqual(response, .sendMessage(framedMessage)) + XCTAssertEqual(response, .sendMessage(message: framedMessage, promise: nil)) } func testNextOutboundMessageWhenClientOpenAndServerClosed() throws { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) // Send message and close server - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) XCTAssertNoThrow( try stateMachine.send( status: .init(code: .ok, message: ""), @@ -1957,7 +1966,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ] - XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes))) + XCTAssertEqual(response, .sendMessage(message: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned anymore XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) @@ -1979,13 +1988,13 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) // Send a message - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) // Close client XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) // Send another message - XCTAssertNoThrow(try stateMachine.send(message: [43, 43])) + XCTAssertNoThrow(try stateMachine.send(message: [43, 43], promise: nil)) // Make sure that getting the next outbound message _does_ return the message // we have enqueued. @@ -1999,7 +2008,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 43, 43, // original message ] - XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes))) + XCTAssertEqual(response, .sendMessage(message: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned anymore XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) @@ -2009,7 +2018,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen) // Send a message and close server - XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) XCTAssertNoThrow( try stateMachine.send( status: .init(code: .ok, message: ""), @@ -2025,7 +2034,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ] - XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes))) + XCTAssertEqual(response, .sendMessage(message: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned anymore XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) @@ -2188,12 +2197,15 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstResponse = [UInt8]([5, 6, 7]) let secondResponse = [UInt8]([8, 9, 10]) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) - try stateMachine.send(message: firstResponse) - try stateMachine.send(message: secondResponse) + try stateMachine.send(message: firstResponse, promise: nil) + try stateMachine.send(message: secondResponse, promise: nil) // Make sure messages are outbound let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages)) + XCTAssertEqual( + try stateMachine.nextOutboundMessage(), + .sendMessage(message: framedMessages, promise: nil) + ) // Client sends end try stateMachine.receive(buffer: ByteBuffer(), endStream: true) @@ -2253,12 +2265,15 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstResponse = [UInt8]([5, 6, 7]) let secondResponse = [UInt8]([8, 9, 10]) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) - try stateMachine.send(message: firstResponse) - try stateMachine.send(message: secondResponse) + try stateMachine.send(message: firstResponse, promise: nil) + try stateMachine.send(message: secondResponse, promise: nil) // Make sure messages are outbound let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages)) + XCTAssertEqual( + try stateMachine.nextOutboundMessage(), + .sendMessage(message: framedMessages, promise: nil) + ) // Server ends let response = try stateMachine.send( @@ -2315,12 +2330,15 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstResponse = [UInt8]([5, 6, 7]) let secondResponse = [UInt8]([8, 9, 10]) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) - try stateMachine.send(message: firstResponse) - try stateMachine.send(message: secondResponse) + try stateMachine.send(message: firstResponse, promise: nil) + try stateMachine.send(message: secondResponse, promise: nil) // Make sure messages are outbound let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages)) + XCTAssertEqual( + try stateMachine.nextOutboundMessage(), + .sendMessage(message: framedMessages, promise: nil) + ) // Server ends let response = try stateMachine.send( @@ -2362,8 +2380,29 @@ extension XCTestCase { }() defer { compressor?.end() } for message in messages { - framer.append(message) + framer.append(message, promise: nil) + } + return try XCTUnwrap(framer.next(compressor: compressor)).bytes + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension GRPCStreamStateMachine.OnNextOutboundMessage: Equatable { + public static func == ( + lhs: GRPCStreamStateMachine.OnNextOutboundMessage, + rhs: GRPCStreamStateMachine.OnNextOutboundMessage + ) -> Bool { + switch (lhs, rhs) { + case (.noMoreMessages, .noMoreMessages): + return true + case (.awaitMoreMessages, .awaitMoreMessages): + return true + case (.sendMessage(let lhsMessage, _), .sendMessage(let rhsMessage, _)): + // Note that we're not comparing the EventLoopPromises here, as they're + // not Equatable. This is fine though, since we only use this in tests. + return lhsMessage == rhsMessage + default: + return false } - return try XCTUnwrap(framer.next(compressor: compressor)) } } From 35adc524a30d109cbb8c550363c9a6d01ad57137 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Tue, 26 Mar 2024 16:18:05 +0000 Subject: [PATCH 02/10] Fix client stream handler --- .../Client/GRPCClientStreamHandler.swift | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift index 6db75f843..7bf410f2a 100644 --- a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift @@ -143,23 +143,16 @@ extension GRPCClientStreamHandler { do { self.flushPending = true let headers = try self.stateMachine.send(metadata: metadata) - context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: nil) - // TODO: move the promise handling into the state machine - promise?.succeed() + context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: promise) } catch { context.fireErrorCaught(error) - // TODO: move the promise handling into the state machine - promise?.fail(error) } case .message(let message): do { - try self.stateMachine.send(message: message) - // TODO: move the promise handling into the state machine - promise?.succeed() + try self.stateMachine.send(message: message, promise: promise) } catch { context.fireErrorCaught(error) - // TODO: move the promise handling into the state machine promise?.fail(error) } } @@ -198,11 +191,11 @@ extension GRPCClientStreamHandler { do { loop: while true { switch try self.stateMachine.nextOutboundMessage() { - case .sendMessage(let byteBuffer): + case .sendMessage(let byteBuffer, let promise): self.flushPending = true context.write( self.wrapOutboundOut(.data(.init(data: .byteBuffer(byteBuffer)))), - promise: nil + promise: promise ) case .noMoreMessages: From 1b53af9d44c7e79a09b2636dd4fcfa772c320201 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Tue, 26 Mar 2024 16:18:08 +0000 Subject: [PATCH 03/10] Add test --- .../GRPCStreamStateMachineTests.swift | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift index 543b8dadc..195c9ff8f 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift @@ -16,6 +16,7 @@ import GRPCCore import NIOCore +import NIOEmbedded import NIOHPACK import XCTest @@ -2194,18 +2195,35 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage)) // Server sends response + let eventLoop = EmbeddedEventLoop() + let firstPromise = eventLoop.makePromise(of: Void.self) + let secondPromise = eventLoop.makePromise(of: Void.self) + let firstResponse = [UInt8]([5, 6, 7]) let secondResponse = [UInt8]([8, 9, 10]) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) - try stateMachine.send(message: firstResponse, promise: nil) - try stateMachine.send(message: secondResponse, promise: nil) + + try stateMachine.send(message: firstResponse, promise: firstPromise) + try stateMachine.send(message: secondResponse, promise: secondPromise) // Make sure messages are outbound let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false) - XCTAssertEqual( - try stateMachine.nextOutboundMessage(), - .sendMessage(message: framedMessages, promise: nil) - ) + + guard + case .sendMessage(let nextOutboundByteBuffer, let nextOutboundPromise) = + try stateMachine.nextOutboundMessage() + else { + XCTFail("Should have received .sendMessage") + return + } + XCTAssertEqual(nextOutboundByteBuffer, framedMessages) + XCTAssertTrue(firstPromise.futureResult === nextOutboundPromise?.futureResult) + + // Make sure that the promises associated with each sent message are chained + // together: when succeeding the one returned by the state machine on + // `nextOutboundMessage()`, the others should also be succeeded. + firstPromise.succeed() + try secondPromise.futureResult.assertSuccess().wait() // Client sends end try stateMachine.receive(buffer: ByteBuffer(), endStream: true) From 73a1876934b73de918f7f0131071ac74deb94bd9 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 10:41:58 +0000 Subject: [PATCH 04/10] Add missing promise failure --- Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift index c0e8e63d1..3364ba9be 100644 --- a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift @@ -164,6 +164,7 @@ extension GRPCServerStreamHandler { self.pendingTrailersPromise = promise } catch { context.fireErrorCaught(error) + promise?.fail(error) } } } From 1d11865fb9353cc9e529f44dcc3fea8d037d99f8 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 13:50:23 +0000 Subject: [PATCH 05/10] Replace GRPCMessageFramer/DataFrame with tuple --- Sources/GRPCHTTP2Core/GRPCMessageFramer.swift | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift index 2be7a9035..042ef6adb 100644 --- a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift +++ b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift @@ -48,17 +48,14 @@ struct GRPCMessageFramer { self.pendingMessages.append((bytes, promise)) } - struct DataFrame { - let bytes: ByteBuffer - let promise: EventLoopPromise? - } - /// If there are pending messages to be framed, a `ByteBuffer` will be returned with the framed data. /// Data may also be compressed (if configured) and multiple frames may be coalesced into the same `ByteBuffer`. /// - Parameter compressor: An optional compressor: if present, payloads will be compressed; otherwise /// they'll be framed as-is. /// - Throws: If an error is encountered, such as a compression failure, an error will be thrown. - mutating func next(compressor: Zlib.Compressor? = nil) throws -> DataFrame? { + mutating func next( + compressor: Zlib.Compressor? = nil + ) throws -> (bytes: ByteBuffer, promise: EventLoopPromise?)? { if self.pendingMessages.isEmpty { // Nothing pending: exit early. return nil @@ -88,7 +85,7 @@ struct GRPCMessageFramer { } } - return DataFrame(bytes: self.writeBuffer, promise: pendingWritePromise) + return (bytes: self.writeBuffer, promise: pendingWritePromise) } private mutating func encode(_ message: [UInt8], compressor: Zlib.Compressor?) throws { From 66eddc10a0e196c53d6deb78881228cf551c7d85 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 13:59:21 +0000 Subject: [PATCH 06/10] Rename outbound messages to outbound frames --- .../Client/GRPCClientStreamHandler.swift | 4 +- .../GRPCStreamStateMachine.swift | 40 ++--- .../Server/GRPCServerStreamHandler.swift | 4 +- .../GRPCStreamStateMachineTests.swift | 142 +++++++++--------- 4 files changed, 95 insertions(+), 95 deletions(-) diff --git a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift index 7bf410f2a..d276961ed 100644 --- a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift @@ -190,8 +190,8 @@ extension GRPCClientStreamHandler { private func _flush(context: ChannelHandlerContext) { do { loop: while true { - switch try self.stateMachine.nextOutboundMessage() { - case .sendMessage(let byteBuffer, let promise): + switch try self.stateMachine.nextOutboundFrame() { + case .sendFrame(let byteBuffer, let promise): self.flushPending = true context.write( self.wrapOutboundOut(.data(.init(data: .byteBuffer(byteBuffer)))), diff --git a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift index 883c96dfa..1c8eca27f 100644 --- a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift +++ b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift @@ -397,26 +397,26 @@ struct GRPCStreamStateMachine { } } - /// The result of requesting the next outbound message. - enum OnNextOutboundMessage { - /// Either the receiving party is closed, so we shouldn't send any more messages; or the sender is done + /// The result of requesting the next outbound frame, which may contain multiple messages. + enum OnNextOutboundFrame { + /// Either the receiving party is closed, so we shouldn't send any more frames; or the sender is done /// writing messages (i.e. we are now closed). case noMoreMessages - /// There isn't a message ready to be sent, but we could still receive more, so keep trying. + /// There isn't a frame ready to be sent, but we could still receive more messages, so keep trying. case awaitMoreMessages - /// A message is ready to be sent. - case sendMessage( - message: ByteBuffer, + /// A frame is ready to be sent. + case sendFrame( + frame: ByteBuffer, promise: EventLoopPromise? ) } - mutating func nextOutboundMessage() throws -> OnNextOutboundMessage { + mutating func nextOutboundFrame() throws -> OnNextOutboundFrame { switch self.configuration { case .client: - return try self.clientNextOutboundMessage() + return try self.clientNextOutboundFrame() case .server: - return try self.serverNextOutboundMessage() + return try self.serverNextOutboundFrame() } } @@ -580,25 +580,25 @@ extension GRPCStreamStateMachine { /// Returns the client's next request to the server. /// - Returns: The request to be made to the server. - private mutating func clientNextOutboundMessage() throws -> OnNextOutboundMessage { + private mutating func clientNextOutboundFrame() throws -> OnNextOutboundFrame { switch self.state { case .clientIdleServerIdle: try self.invalidState("Client is not open yet.") case .clientOpenServerIdle(var state): let request = try state.framer.next(compressor: state.compressor) self.state = .clientOpenServerIdle(state) - return request.map { .sendMessage(message: $0.bytes, promise: $0.promise) } + return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) } ?? .awaitMoreMessages case .clientOpenServerOpen(var state): let request = try state.framer.next(compressor: state.compressor) self.state = .clientOpenServerOpen(state) - return request.map { .sendMessage(message: $0.bytes, promise: $0.promise) } + return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) } ?? .awaitMoreMessages case .clientClosedServerIdle(var state): let request = try state.framer.next(compressor: state.compressor) self.state = .clientClosedServerIdle(state) if let request { - return .sendMessage(message: request.bytes, promise: request.promise) + return .sendFrame(frame: request.bytes, promise: request.promise) } else { return .noMoreMessages } @@ -606,7 +606,7 @@ extension GRPCStreamStateMachine { let request = try state.framer.next(compressor: state.compressor) self.state = .clientClosedServerOpen(state) if let request { - return .sendMessage(message: request.bytes, promise: request.promise) + return .sendFrame(frame: request.bytes, promise: request.promise) } else { return .noMoreMessages } @@ -1356,25 +1356,25 @@ extension GRPCStreamStateMachine { } } - private mutating func serverNextOutboundMessage() throws -> OnNextOutboundMessage { + private mutating func serverNextOutboundFrame() throws -> OnNextOutboundFrame { switch self.state { case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle: try self.invalidState("Server is not open yet.") case .clientOpenServerOpen(var state): let response = try state.framer.next(compressor: state.compressor) self.state = .clientOpenServerOpen(state) - return response.map { .sendMessage(message: $0.bytes, promise: $0.promise) } + return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) } ?? .awaitMoreMessages case .clientClosedServerOpen(var state): let response = try state.framer.next(compressor: state.compressor) self.state = .clientClosedServerOpen(state) - return response.map { .sendMessage(message: $0.bytes, promise: $0.promise) } + return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) } ?? .awaitMoreMessages case .clientOpenServerClosed(var state): let response = try state.framer?.next(compressor: state.compressor) self.state = .clientOpenServerClosed(state) if let response { - return .sendMessage(message: response.bytes, promise: response.promise) + return .sendFrame(frame: response.bytes, promise: response.promise) } else { return .noMoreMessages } @@ -1382,7 +1382,7 @@ extension GRPCStreamStateMachine { let response = try state.framer?.next(compressor: state.compressor) self.state = .clientClosedServerClosed(state) if let response { - return .sendMessage(message: response.bytes, promise: response.promise) + return .sendFrame(frame: response.bytes, promise: response.promise) } else { return .noMoreMessages } diff --git a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift index 3364ba9be..e51a42d20 100644 --- a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift @@ -177,8 +177,8 @@ extension GRPCServerStreamHandler { do { loop: while true { - switch try self.stateMachine.nextOutboundMessage() { - case .sendMessage(let byteBuffer, let promise): + switch try self.stateMachine.nextOutboundFrame() { + case .sendFrame(let byteBuffer, let promise): self.flushPending = true context.write( self.wrapOutboundOut(.data(.init(data: .byteBuffer(byteBuffer)))), diff --git a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift index 195c9ff8f..324d47fdb 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift @@ -625,7 +625,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.nextOutboundMessage() + try stateMachine.nextOutboundFrame() ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client is not open yet.") @@ -636,7 +636,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { for targetState in [TargetStateMachineState.clientOpenServerIdle, .clientOpenServerOpen] { var stateMachine = self.makeClientStateMachine(targetState: targetState) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) @@ -646,12 +646,12 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 42, 42, // original message ] XCTAssertEqual( - try stateMachine.nextOutboundMessage(), - .sendMessage(message: ByteBuffer(bytes: expectedBytes), promise: nil) + try stateMachine.nextOutboundFrame(), + .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil) ) // And then make sure that nothing else is returned anymore - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) } } @@ -661,14 +661,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { compressionEnabled: true ) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) let originalMessage = [UInt8]([42, 42, 43, 43]) XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil)) - let request = try stateMachine.nextOutboundMessage() + let request = try stateMachine.nextOutboundFrame() let framedMessage = try self.frameMessage(originalMessage, compress: true) - XCTAssertEqual(request, .sendMessage(message: framedMessage, promise: nil)) + XCTAssertEqual(request, .sendFrame(frame: framedMessage, promise: nil)) } func testNextOutboundMessageWhenClientOpenAndServerOpen_WithCompression() throws { @@ -677,26 +677,26 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { compressionEnabled: true ) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) let originalMessage = [UInt8]([42, 42, 43, 43]) XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil)) - let request = try stateMachine.nextOutboundMessage() + let request = try stateMachine.nextOutboundFrame() let framedMessage = try self.frameMessage(originalMessage, compress: true) - XCTAssertEqual(request, .sendMessage(message: framedMessage, promise: nil)) + XCTAssertEqual(request, .sendFrame(frame: framedMessage, promise: nil)) } func testNextOutboundMessageWhenClientOpenAndServerClosed() throws { var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerClosed) // No more messages to send - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) // Queue a message, but assert the action is .noMoreMessages nevertheless, // because the server is closed. XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) } func testNextOutboundMessageWhenClientClosedAndServerIdle() throws { @@ -708,16 +708,16 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Make sure that getting the next outbound message _does_ return the message // we have enqueued. - let request = try stateMachine.nextOutboundMessage() + let request = try stateMachine.nextOutboundFrame() let expectedBytes: [UInt8] = [ 0, // compression flag: unset 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ] - XCTAssertEqual(request, .sendMessage(message: ByteBuffer(bytes: expectedBytes), promise: nil)) + XCTAssertEqual(request, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned anymore - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) } func testNextOutboundMessageWhenClientClosedAndServerOpen() throws { @@ -729,16 +729,16 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Make sure that getting the next outbound message _does_ return the message // we have enqueued. - let request = try stateMachine.nextOutboundMessage() + let request = try stateMachine.nextOutboundFrame() let expectedBytes: [UInt8] = [ 0, // compression flag: unset 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ] - XCTAssertEqual(request, .sendMessage(message: ByteBuffer(bytes: expectedBytes), promise: nil)) + XCTAssertEqual(request, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned anymore - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) } func testNextOutboundMessageWhenClientClosedAndServerClosed() throws { @@ -754,7 +754,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Even though we have enqueued a message, don't send it, because the server // is closed. - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) } // - MARK: Next inbound message @@ -888,16 +888,16 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { ) // Client sends messages - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) let message = [UInt8]([1, 2, 3, 4]) let framedMessage = try self.frameMessage(message, compress: false) try stateMachine.send(message: message, promise: nil) XCTAssertEqual( - try stateMachine.nextOutboundMessage(), - .sendMessage(message: framedMessage, promise: nil) + try stateMachine.nextOutboundFrame(), + .sendFrame(frame: framedMessage, promise: nil) ) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) // Server sends response XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) @@ -933,7 +933,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { .receivedStatusAndMetadata(status: .init(code: .ok, message: ""), metadata: receivedMetadata) ) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } @@ -955,17 +955,17 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { ) // Client sends messages and ends - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) let message = [UInt8]([1, 2, 3, 4]) let framedMessage = try self.frameMessage(message, compress: false) XCTAssertNoThrow(try stateMachine.send(message: message, promise: nil)) XCTAssertNoThrow(try stateMachine.closeOutbound()) XCTAssertEqual( - try stateMachine.nextOutboundMessage(), - .sendMessage(message: framedMessage, promise: nil) + try stateMachine.nextOutboundFrame(), + .sendFrame(frame: framedMessage, promise: nil) ) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) // Server sends initial metadata let serverInitialHeadersAction = try stateMachine.receive( @@ -1012,7 +1012,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { .receivedStatusAndMetadata(status: .init(code: .ok, message: ""), metadata: receivedMetadata) ) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } @@ -1034,16 +1034,16 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { ) // Client sends messages - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) let message = [UInt8]([1, 2, 3, 4]) let framedMessage = try self.frameMessage(message, compress: false) try stateMachine.send(message: message, promise: nil) XCTAssertEqual( - try stateMachine.nextOutboundMessage(), - .sendMessage(message: framedMessage, promise: nil) + try stateMachine.nextOutboundFrame(), + .sendFrame(frame: framedMessage, promise: nil) ) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) // Server sends initial metadata let serverInitialHeadersAction = try stateMachine.receive( @@ -1093,7 +1093,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { .receivedStatusAndMetadata(status: .init(code: .ok, message: ""), metadata: receivedMetadata) ) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } } @@ -1883,7 +1883,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.nextOutboundMessage() + try stateMachine.nextOutboundFrame() ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server is not open yet.") @@ -1895,7 +1895,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.nextOutboundMessage() + try stateMachine.nextOutboundFrame() ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server is not open yet.") @@ -1907,7 +1907,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.nextOutboundMessage() + try stateMachine.nextOutboundFrame() ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server is not open yet.") @@ -1917,20 +1917,20 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { func testNextOutboundMessageWhenClientOpenAndServerOpen() throws { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) - let response = try stateMachine.nextOutboundMessage() + let response = try stateMachine.nextOutboundFrame() let expectedBytes: [UInt8] = [ 0, // compression flag: unset 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ] - XCTAssertEqual(response, .sendMessage(message: ByteBuffer(bytes: expectedBytes), promise: nil)) + XCTAssertEqual(response, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) } func testNextOutboundMessageWhenClientOpenAndServerOpen_WithCompression() throws { @@ -1939,14 +1939,14 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { compressionEnabled: true ) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) let originalMessage = [UInt8]([42, 42, 43, 43]) XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil)) - let response = try stateMachine.nextOutboundMessage() + let response = try stateMachine.nextOutboundFrame() let framedMessage = try self.frameMessage(originalMessage, compress: true) - XCTAssertEqual(response, .sendMessage(message: framedMessage, promise: nil)) + XCTAssertEqual(response, .sendFrame(frame: framedMessage, promise: nil)) } func testNextOutboundMessageWhenClientOpenAndServerClosed() throws { @@ -1961,16 +1961,16 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) ) - let response = try stateMachine.nextOutboundMessage() + let response = try stateMachine.nextOutboundFrame() let expectedBytes: [UInt8] = [ 0, // compression flag: unset 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ] - XCTAssertEqual(response, .sendMessage(message: ByteBuffer(bytes: expectedBytes), promise: nil)) + XCTAssertEqual(response, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned anymore - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) } func testNextOutboundMessageWhenClientClosedAndServerIdle() throws { @@ -1978,7 +1978,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.nextOutboundMessage() + try stateMachine.nextOutboundFrame() ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server is not open yet.") @@ -1999,7 +1999,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Make sure that getting the next outbound message _does_ return the message // we have enqueued. - let response = try stateMachine.nextOutboundMessage() + let response = try stateMachine.nextOutboundFrame() let expectedBytes: [UInt8] = [ 0, // compression flag: unset 0, 0, 0, 2, // message length: 2 bytes @@ -2009,10 +2009,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 43, 43, // original message ] - XCTAssertEqual(response, .sendMessage(message: ByteBuffer(bytes: expectedBytes), promise: nil)) + XCTAssertEqual(response, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned anymore - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) } func testNextOutboundMessageWhenClientClosedAndServerClosed() throws { @@ -2029,16 +2029,16 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // We have enqueued a message, make sure we return it even though server is closed, // because we haven't yet drained all of the pending messages. - let response = try stateMachine.nextOutboundMessage() + let response = try stateMachine.nextOutboundFrame() let expectedBytes: [UInt8] = [ 0, // compression flag: unset 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ] - XCTAssertEqual(response, .sendMessage(message: ByteBuffer(bytes: expectedBytes), promise: nil)) + XCTAssertEqual(response, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil)) // And then make sure that nothing else is returned anymore - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) } // - MARK: Next inbound message @@ -2201,7 +2201,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstResponse = [UInt8]([5, 6, 7]) let secondResponse = [UInt8]([8, 9, 10]) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) try stateMachine.send(message: firstResponse, promise: firstPromise) try stateMachine.send(message: secondResponse, promise: secondPromise) @@ -2210,8 +2210,8 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false) guard - case .sendMessage(let nextOutboundByteBuffer, let nextOutboundPromise) = - try stateMachine.nextOutboundMessage() + case .sendFrame(let nextOutboundByteBuffer, let nextOutboundPromise) = + try stateMachine.nextOutboundFrame() else { XCTFail("Should have received .sendMessage") return @@ -2235,7 +2235,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) XCTAssertEqual(response, ["grpc-status": "0"]) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } @@ -2282,15 +2282,15 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Server sends response let firstResponse = [UInt8]([5, 6, 7]) let secondResponse = [UInt8]([8, 9, 10]) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) try stateMachine.send(message: firstResponse, promise: nil) try stateMachine.send(message: secondResponse, promise: nil) // Make sure messages are outbound let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false) XCTAssertEqual( - try stateMachine.nextOutboundMessage(), - .sendMessage(message: framedMessages, promise: nil) + try stateMachine.nextOutboundFrame(), + .sendFrame(frame: framedMessages, promise: nil) ) // Server ends @@ -2300,7 +2300,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) XCTAssertEqual(response, ["grpc-status": "0"]) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } @@ -2347,15 +2347,15 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Server sends response let firstResponse = [UInt8]([5, 6, 7]) let secondResponse = [UInt8]([8, 9, 10]) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) try stateMachine.send(message: firstResponse, promise: nil) try stateMachine.send(message: secondResponse, promise: nil) // Make sure messages are outbound let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false) XCTAssertEqual( - try stateMachine.nextOutboundMessage(), - .sendMessage(message: framedMessages, promise: nil) + try stateMachine.nextOutboundFrame(), + .sendFrame(frame: framedMessages, promise: nil) ) // Server ends @@ -2365,7 +2365,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) XCTAssertEqual(response, ["grpc-status": "0"]) - XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) + XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } } @@ -2405,17 +2405,17 @@ extension XCTestCase { } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension GRPCStreamStateMachine.OnNextOutboundMessage: Equatable { +extension GRPCStreamStateMachine.OnNextOutboundFrame: Equatable { public static func == ( - lhs: GRPCStreamStateMachine.OnNextOutboundMessage, - rhs: GRPCStreamStateMachine.OnNextOutboundMessage + lhs: GRPCStreamStateMachine.OnNextOutboundFrame, + rhs: GRPCStreamStateMachine.OnNextOutboundFrame ) -> Bool { switch (lhs, rhs) { case (.noMoreMessages, .noMoreMessages): return true case (.awaitMoreMessages, .awaitMoreMessages): return true - case (.sendMessage(let lhsMessage, _), .sendMessage(let rhsMessage, _)): + case (.sendFrame(let lhsMessage, _), .sendFrame(let rhsMessage, _)): // Note that we're not comparing the EventLoopPromises here, as they're // not Equatable. This is fine though, since we only use this in tests. return lhsMessage == rhsMessage From a1f0fff4983e3d9186e8ba0458dc2b964ba239de Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 14:02:09 +0000 Subject: [PATCH 07/10] Store pending trailers and their promise as a tuple in server stream handler --- .../Server/GRPCServerStreamHandler.swift | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift index e51a42d20..f7b93cb0d 100644 --- a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift @@ -34,8 +34,8 @@ final class GRPCServerStreamHandler: ChannelDuplexHandler { // We buffer the final status + trailers to avoid reordering issues (i.e., // if there are messages still not written into the channel because flush has // not been called, but the server sends back trailers). - private var pendingTrailers: HTTP2Frame.FramePayload? - private var pendingTrailersPromise: EventLoopPromise? + private var pendingTrailers: + (trailers: HTTP2Frame.FramePayload, promise: EventLoopPromise?)? init( scheme: Scheme, @@ -160,8 +160,7 @@ extension GRPCServerStreamHandler { do { let headers = try self.stateMachine.send(status: status, metadata: metadata) let response = HTTP2Frame.FramePayload.headers(.init(headers: headers, endStream: true)) - self.pendingTrailers = response - self.pendingTrailersPromise = promise + self.pendingTrailers = (response, promise) } catch { context.fireErrorCaught(error) promise?.fail(error) @@ -186,13 +185,13 @@ extension GRPCServerStreamHandler { ) case .noMoreMessages: - if let pendingTrailers = self.pendingTrailers, - let pendingTrailersPromise = self.pendingTrailersPromise - { + if let pendingTrailers = self.pendingTrailers { self.flushPending = true self.pendingTrailers = nil - self.pendingTrailersPromise = nil - context.write(self.wrapOutboundOut(pendingTrailers), promise: pendingTrailersPromise) + context.write( + self.wrapOutboundOut(pendingTrailers.trailers), + promise: pendingTrailers.promise + ) } break loop From 10258396b5e151d339c4bd7c83268915cebecffd Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 14:11:26 +0000 Subject: [PATCH 08/10] Check promises are chained in framer tests --- .../GRPCMessageFramerTests.swift | 40 ++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift index 6d61af763..bf9696e73 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift @@ -15,6 +15,7 @@ */ import NIOCore +import NIOEmbedded import XCTest @testable import GRPCHTTP2Core @@ -74,26 +75,47 @@ final class GRPCMessageFramerTests: XCTestCase { func testMultipleWrites() throws { var framer = GRPCMessageFramer() - - let messages = 100 - for _ in 0 ..< messages { - framer.append(Array(repeating: 42, count: 128), promise: nil) + let eventLoop = EmbeddedEventLoop() + + // Create 100 messages and link a different promise with each of them. + let messagesCount = 100 + var promises = [EventLoopPromise]() + promises.reserveCapacity(messagesCount) + for _ in 0 ..< messagesCount { + let promise = eventLoop.makePromise(of: Void.self) + promises.append(promise) + framer.append(Array(repeating: 42, count: 128), promise: promise) } - var buffer = try XCTUnwrap(framer.next()).bytes - for _ in 0 ..< messages { + let nextFrame = try XCTUnwrap(framer.next()) + + // Assert the messages have been framed all together in the same frame. + var buffer = nextFrame.bytes + for _ in 0 ..< messagesCount { let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader()) XCTAssertFalse(compressed) XCTAssertEqual(length, 128) XCTAssertEqual(buffer.readSlice(length: Int(length)), ByteBuffer(repeating: 42, count: 128)) } - XCTAssertEqual(buffer.readableBytes, 0) - // No more bufers. + // Assert the promise returned from the framer is the promise linked to the + // first message appended to the framer. + let returnedPromise = nextFrame.promise + XCTAssertEqual(returnedPromise?.futureResult, promises.first?.futureResult) + + // Succeed the returned promise to simulate a write into the channel + // succeeding, and assert that all other promises have been chained and are + // also succeeded as a result. + returnedPromise?.succeed() + XCTAssertEqual(promises.count, messagesCount) + for promise in promises { + try promise.futureResult.assertSuccess().wait() + } + + // No more frames. XCTAssertNil(try framer.next()) } - } extension ByteBuffer { From bc7a0df76ad279834cae3147fbe4c0b6c212c9d6 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 14:14:15 +0000 Subject: [PATCH 09/10] Replace nil comparison with if let --- Sources/GRPCHTTP2Core/GRPCMessageFramer.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift index 042ef6adb..29da4b7dc 100644 --- a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift +++ b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift @@ -78,10 +78,10 @@ struct GRPCMessageFramer { var pendingWritePromise: EventLoopPromise? while let message = self.pendingMessages.pop() { try self.encode(message.bytes, compressor: compressor) - if pendingWritePromise == nil { - pendingWritePromise = message.promise + if let existingPendingWritePromise = pendingWritePromise { + existingPendingWritePromise.futureResult.cascade(to: message.promise) } else { - pendingWritePromise?.futureResult.cascade(to: message.promise) + pendingWritePromise = message.promise } } From 722fd317beb3da5a9d5ebfef4a18e80a3cd009e5 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 14:52:59 +0000 Subject: [PATCH 10/10] Add missing promise failures --- Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift | 3 ++- Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift index d276961ed..5be8f1025 100644 --- a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift @@ -145,6 +145,7 @@ extension GRPCClientStreamHandler { let headers = try self.stateMachine.send(metadata: metadata) context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: promise) } catch { + promise?.fail(error) context.fireErrorCaught(error) } @@ -152,8 +153,8 @@ extension GRPCClientStreamHandler { do { try self.stateMachine.send(message: message, promise: promise) } catch { - context.fireErrorCaught(error) promise?.fail(error) + context.fireErrorCaught(error) } } } diff --git a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift index f7b93cb0d..14c576c20 100644 --- a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift @@ -145,14 +145,15 @@ extension GRPCServerStreamHandler { let headers = try self.stateMachine.send(metadata: metadata) context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: promise) } catch { - context.fireErrorCaught(error) promise?.fail(error) + context.fireErrorCaught(error) } case .message(let message): do { try self.stateMachine.send(message: message, promise: promise) } catch { + promise?.fail(error) context.fireErrorCaught(error) } @@ -162,8 +163,8 @@ extension GRPCServerStreamHandler { let response = HTTP2Frame.FramePayload.headers(.init(headers: headers, endStream: true)) self.pendingTrailers = (response, promise) } catch { - context.fireErrorCaught(error) promise?.fail(error) + context.fireErrorCaught(error) } } }